Энтони Уильямс - Параллельное программирование на С++ в действии. Практика разработки многопоточных программ
- Название:Параллельное программирование на С++ в действии. Практика разработки многопоточных программ
- Автор:
- Жанр:
- Издательство:ДМК Пресс
- Год:2012
- Город:Москва
- ISBN:978-5-94074-448-1
- Рейтинг:
- Избранное:Добавить в избранное
-
Отзывы:
-
Ваша оценка:
Энтони Уильямс - Параллельное программирование на С++ в действии. Практика разработки многопоточных программ краткое содержание
Книга «Параллельное программирование на С++ в действии» не предполагает предварительных знаний в этой области. Вдумчиво читая ее, вы научитесь писать надежные и элегантные многопоточные программы на С++11. Вы узнаете о том, что такое потоковая модель памяти, и о том, какие средства поддержки многопоточности, в том числе запуска и синхронизации потоков, имеются в стандартной библиотеке. Попутно вы познакомитесь с различными нетривиальными проблемами программирования в условиях параллелизма.
Параллельное программирование на С++ в действии. Практика разработки многопоточных программ - читать онлайн бесплатно полную версию (весь текст целиком)
Интервал:
Закладка:
Листинг 9.2.Пул потоков, ожидающий завершения задачи
class function_wrapper {
struct impl_base {
virtual void call() = 0;
virtual ~impl_base() {}
};
std::unique_ptr impl;
template
struct impl_type: impl_base {
F f;
impl_type(F&& f_): f(std::move(f_)) {}
void call() { f(); }
};
public:
template function_wrapper(F&& f):
impl(new impl_type(std::move(f))) {}
void operator()() { impl->call(); }
function_wrapper() = default;
function_wrapper(function_wrapper&& other):
impl(std::move(other.impl)) {}
function_wrapper& operator=(function_wrapper&& other) {
impl = std::move(other.impl);
return *this;
}
function_wrapper(const function_wrapper&) = delete;
function_wrapper(function_wrapper&) = delete;
function_wrapper& operator=(const function_wrapper&) = delete;
};
class thread_pool {
thread_safe_queue work_queue;←┐
│ Используем
void worker_thread() │ function_
{ │ wrapper
while (!done) │ вместо std::
{ │ function
function_wrapper task; ←┘
if (work_queue.try_pop(task))
task();
else
std::this_thread::yield();
}
}
public:
template
std::future::type>← (1)
submit(FunctionType f) {
typedef typename std::result_of::type
result_type; ← (2)
std::packaged_task task(std::move(f));← (3)
std::future res(task.get_future()); ← (4)
work_queue.push(std::move(task)); ← (5)
return res; ← (6)
}
// остальное, как и раньше
};
Прежде всего отметим, что модифицированная функция submit() (1)возвращает объект std::future<>, который будет содержать возвращенное задачей значение и позволит вызывающей программе ждать ее завершения. Для этого нам необходимо знать тип значения, возвращаемого переданной функцией f, и здесь на помощь приходит шаблон std::result_of<>: std::result_of::type— это тип результата, возвращенного вызовом объекта типа FunctionType(например, f) без аргументов. Выражение std::result_of<>мы используем также в определении псевдонима типа result_type (2)внутри функции.
Затем fобертывается объектом std::packaged_task (3), потому что f— функция или допускающий вызов объект, который не принимает параметров и возвращает результат типа result_type. Теперь мы можем получить будущий результат из std::packaged_task<> (4), перед тем как помещать задачу в очередь (5)и возвращать будущий результат (6). Отметим, что при помещении задачи в очередь мы должны использовать функцию std::move(), потому что класс std::packaged_task<> не допускает копирования. Именно поэтому в очереди хранятся объекты function_wrapper, а не объекты типа.
Этот пул позволяет ожидать завершения задач и получать возвращаемые ими результаты. В листинге ниже показано, как выглядит функция parallel_accumulate, работающая с таким пулом потоков.
Листинг 9.3.Функция parallel_accumulate, реализованная с помощью пула потоков, допускающего ожидание задач
template
T parallel_accumulate(Iterator first, Iterator last, T init) {
unsigned long const length = std::distance(first, last);
if (!length)
return init;
unsigned long const block_size = 25;
unsigned long const num_blocks =
(length + block_size - 1) / block_size; ← (1)
std::vector > futures(num_blocks-1);
thread_pool pool;
Iterator block_start = first;
for (unsigned long i = 0; i < (num_blocks - 1); ++i) {
Iterator block_end = block_start;
std::advance(block_end, block_size);
futures[i] = pool.submit(accumulate_block());← (2)
block_start = block_end;
}
T last_result =
accumulate_block()(block_start, last);
T result = init;
for (unsigned long i = 0; i < (num_blocks - 1); ++i) {
result += futures[i].get();
}
result += last_result;
return result;
}
Сравнивая этот код с листингом 8.4, следует обратить внимание на две вещи. Во-первых, мы работаем с количеством блоков ( num_blocks (1)), а не потоков. Чтобы в полной мере воспользоваться масштабируемостью пула потоков, мы должны разбить работу на максимально мелкие блоки, с которыми имеет смысл работать параллельно. Если потоков в пуле немного, то каждый поток будет обрабатывать много блоков, но по мере роста числа потоков, поддерживаемых оборудованием, будет расти и количество блоков, обрабатываемых параллельно.
Но, выбирая «максимально мелкие блоки, с которыми имеет смысл работать параллельно», будьте осторожны. Отправка задачи пулу потоков, выбор ее рабочим потоком из очереди и передача возвращенного значения с помощью std::future<>— всё это операции не бесплатные, и для совсем мелких задач они не окупятся. Если размер задачи слишком мал, то программа, в которой используется пул потоков, может работать медленнее, чем однопоточная.
В предположении, что размер блока выбран разумно, вам не надо заботиться об упаковке задач, получении будущих результатов и хранении объектов std::thread, чтобы впоследствии их можно было присоединить; все это пул берет на себя. Вам остается лишь вызвать функцию submit(), передав ей свою задачу (2).
Пул потоков обеспечивает также безопасность относительно исключений. Любое возбужденное задачей исключение передается через будущий результат, возвращенный submit(), а, если выход из функции происходит в результате исключения, то деструктор пула потоков снимет еще работающие задачи и дождется завершения потоков, входящих в пул.
Эта схема работает для простых случаев, когда задачи независимы. Но не годится, когда одни задачи зависят от других, также переданных пулу.
9.1.3. Задачи, ожидающие других задач
В этой книге я уже неоднократно приводил пример алгоритма Quicksort. Его идея проста — подлежащие сортировке данные разбиваются на две части: до и после опорного элемента (в смысле заданной операции сравнения). Затем обе части рекурсивно сортируются и объединяются для получения полностью отсортированной последовательности. При распараллеливании алгоритма надо позаботиться о том, чтобы рекурсивные вызовы задействовали имеющийся аппаратный параллелизм.
В главе 4, где этот пример впервые был представлен, мы использовали std::asyncдля выполнения одного из рекурсивных вызовов на каждом шаге и оставляли библиотеке решение о том, запускать ли новый поток или сортировать синхронно при обращении к get(). Этот подход неплохо работает — каждая задача либо выполняется в отдельном потоке, либо в тот момент, когда нужны ее результаты.
В главе 8 мы переработали эту реализацию, продемонстрировав альтернативный подход, когда количество потоков фиксировано и определяется уровнем аппаратного параллелизма. В данном случае мы воспользовались стеком ожидающих сортировки блоков. Разбивая на части предложенные для сортировки данные, каждый поток помещал один блок в стек, а второй сортировал непосредственно. Бесхитростное ожидание завершения сортировки второго блока могло бы закончиться взаимоблокировкой, потому что число потоков ограничено, и некоторые из них ждут. Очень легко оказаться в ситуации, когда все потоки ждут завершения сортировки блоков, и ни один ничего не делает. Тогда мы решили эту проблему, заставив поток извлекать блоки из стека и сортировать их, пока тот конкретный блок, которого он ждет, еще не отсортирован.
Читать дальшеИнтервал:
Закладка: