Энтони Уильямс - Параллельное программирование на С++ в действии. Практика разработки многопоточных программ
- Название:Параллельное программирование на С++ в действии. Практика разработки многопоточных программ
- Автор:
- Жанр:
- Издательство:ДМК Пресс
- Год:2012
- Город:Москва
- ISBN:978-5-94074-448-1
- Рейтинг:
- Избранное:Добавить в избранное
-
Отзывы:
-
Ваша оценка:
Энтони Уильямс - Параллельное программирование на С++ в действии. Практика разработки многопоточных программ краткое содержание
Книга «Параллельное программирование на С++ в действии» не предполагает предварительных знаний в этой области. Вдумчиво читая ее, вы научитесь писать надежные и элегантные многопоточные программы на С++11. Вы узнаете о том, что такое потоковая модель памяти, и о том, какие средства поддержки многопоточности, в том числе запуска и синхронизации потоков, имеются в стандартной библиотеке. Попутно вы познакомитесь с различными нетривиальными проблемами программирования в условиях параллелизма.
Параллельное программирование на С++ в действии. Практика разработки многопоточных программ - читать онлайн бесплатно полную версию (весь текст целиком)
Интервал:
Закладка:
Затем функция submit()
проверяет, есть ли у текущего потока очередь работ (4). Если есть, то это поток из пула, и мы можем поместить задачу в локальную очередь. В противном случае задачу следует помещать в очередь пула, как и раньше (5).
Аналогичная проверка имеется в функции run_pending_task()
(6), только на этот раз нужно еще проверить, есть ли что-нибудь в локальной очереди. Если есть, то можно извлечь элемент из начала очереди и обработать его. Обратите внимание, что локальная очередь может быть обычным объектом std::queue<>
(1), так как к ней обращается только один поток. Если в локальной очереди задач нет, то мы проверяем очередь пула, как и раньше (7).
Таким образом мы действительно уменьшаем конкуренцию, но если распределение работ неравномерно, то может случиться, что в очереди одного потока скопится много задач, тогда как остальным будет нечем заняться. Например, в случае Quicksort только самый первый блок попадает в очередь пула, а остальные окажутся в локальной очереди того потока, который этот блок обработал. Это сводит на нет всю идею пула потоков.
К счастью, у этой проблемы есть решение: позволить потоку занимать (steal) работы из очередей других потоков, если ни в его собственной, ни в глобальной очереди ничего нет.
9.1.5. Занимание работ
Если мы хотим, чтобы «безработный» поток мог брать работы из очереди другого потока, то эта очередь должна быть доступна занимающему потоку в run_pending_tasks()
. Для этого каждый поток должен зарегистрировать свою очередь в пуле или получать очередь от пула. Кроме того, необходимо позаботиться о надлежащей синхронизации и защите очереди работ, чтобы не нарушались инварианты.
Можно написать свободную от блокировок очередь, которая позволит потоку-владельцу помещать и извлекать элементы с одного конца, а другим потокам — занимать элементы с другого конца, однако реализация такой очереди выходит за рамки данной книги. Чтобы продемонстрировать идею, мы поступим проще — воспользуемся мьютексом для защиты данных очереди. Мы надеемся, что занимание работ — редкое событие, поэтому конкуренция за мьютекс будет невелика, и накладные расходы на такую очередь окажутся минимальны. Ниже приведена простая реализация с блокировками.
Листинг 9.7.Очередь с блокировкой, допускающей занимание работ
class work_stealing_queue {
private:
typedef function_wrapper data_type;
std::deque the_queue; ←
(1)
mutable std::mutex the_mutex;
public:
work_stealing_queue() {}
work_stealing_queue(const work_stealing_queue& other)=delete;
work_stealing_queue& operator=(
const work_stealing_queue& other)=delete;
void push(data_type data) { ←
(2)
std::lock_guard lock(the_mutex);
the_queue.push_front(std::move(data));
}
bool empty() const {
std::lock_guard lock(the_mutex);
return the_queue.empty();
}
bool try_pop(data_type& res) { ←
(3)
std::lock_guard lock(the_mutex);
if (the_queue.empty()) {
return false;
}
res = std::move(the_queue.front());
the_queue.pop_front();
return true;
}
bool try_steal(data_type& res) { ←
(4)
std::lock_guard lock(the_mutex);
if (the_queue.empty()) {
return false;
}
res = std::move(the_queue.back());
the_queue.pop_back();
return true;
}
};
Этот класс является простой оберткой вокруг std::deque
(1), которая защищает все операции доступа к очереди с помощью мьютекса. Функции push()
(2)и try_ pop()
(3)работают с началом очереди, а функция try_steal()
— с концом (4).
Получается, что эта «очередь» для потока-владельца на самом деле является стеком, обслуживаемым согласно дисциплине «последним пришёл, первым обслужен», — задача, которая была помещена последней, извлекается первой. С точки зрения кэш-памяти это даже может повысить производительность, так как относящиеся к последней задаче данные с большей вероятностью окажутся в кэше, чем данные, относящиеся к предыдущей задаче. К тому же, такая дисциплина прекрасно подходит для алгоритмов типа Quicksort. В предшествующих реализациях каждое обращение к do_sort()
помещает элемент в очередь, а затем ждет его. Обрабатывая последний помещенный в очередь элемент первым, мы гарантируем, что блок, необходимый текущему вызову для завершения работы, будет обработан раньше блоков, нужных другим ветвям, а, значит, уменьшается как количество активных задач, так и занятый размер стека. Функция try_steal()
извлекает элементы из противоположного по сравнению с try_pop()
конца очереди, чтобы минимизировать конкуренцию; в принципе, можно было бы применить технику, обсуждавшуюся в главах 6 и 7, чтобы поддержать одновременные обращения к try_pop()
и try_steal()
.
Итак, теперь у нас есть замечательная очередь работ, допускающая занимание. Но как воспользоваться ей в пуле потоков? Ниже приведена одна из возможных реализаций.
Листинг 9.8.Пул потоков с использованием занимания работ
class thread_pool {
typedef function_wrapper task_type;
std::atomic_bool done;
thread_safe_queue pool_work_queue;
std::vector > queues;←
(1)
std::vector threads;
join_threads joiner;
static thread_local work_stealing_queue* local_work_queue;←
(2)
static thread_local unsigned my_index;
void worker_thread(unsigned my_index_) {
my_index = my_index_;
local_work_queue = queues[my_index].get(); ←
(3)
while (!done) {
run_pending_task();
}
}
bool pop_task_from_local_queue(task_type& task) {
return local_work_queue && local_work_queue->try_pop(task);
}
bool pop_task_from_pool_queue(task_type& task) {
return pool_work_queue.try_pop(task);
}
bool pop_task_from_other_thread_queue(task_type& task) { ←
(4)
for (unsigned i = 0; i < queues.size(); ++i) {
unsigned const index = (my_index + i + 1) % queues.size();←
(5)
if (queues[index]->try_steal(task)) {
return true;
}
}
return false;
}
public:
thread_pool() :
done(false), joiner(threads) {
unsigned const thread_count =
std::thread::hardware_concurrency();
try {
for (unsigned i = 0; i < thread_count; ++i) {
queues.push_back(std::unique_ptr (←
(6)
new work_stealing_queue));
threads.push_back(
std::thread(&thread_pool::worker_thread, this, i));
}
} catch (...) {
done = true;
throw;
}
}
~thread_pool() {
done = true;
}
template
std::future::type> submit(
FunctionType f) {
typedef typename std::result_of::type
result_type;
std::packaged_task task(f);
std::future res(task.get_future());
if (local_work_queue) {
local_work_queue->push(std::move(task));
} else {
pool_work_queue.push(std::move(task));
}
return res;
}
void run_pending_task() {
task_type task;
Интервал:
Закладка: