Энтони Уильямс - Параллельное программирование на С++ в действии. Практика разработки многопоточных программ
- Название:Параллельное программирование на С++ в действии. Практика разработки многопоточных программ
- Автор:
- Жанр:
- Издательство:ДМК Пресс
- Год:2012
- Город:Москва
- ISBN:978-5-94074-448-1
- Рейтинг:
- Избранное:Добавить в избранное
-
Отзывы:
-
Ваша оценка:
Энтони Уильямс - Параллельное программирование на С++ в действии. Практика разработки многопоточных программ краткое содержание
Книга «Параллельное программирование на С++ в действии» не предполагает предварительных знаний в этой области. Вдумчиво читая ее, вы научитесь писать надежные и элегантные многопоточные программы на С++11. Вы узнаете о том, что такое потоковая модель памяти, и о том, какие средства поддержки многопоточности, в том числе запуска и синхронизации потоков, имеются в стандартной библиотеке. Попутно вы познакомитесь с различными нетривиальными проблемами программирования в условиях параллелизма.
Параллельное программирование на С++ в действии. Практика разработки многопоточных программ - читать онлайн бесплатно полную версию (весь текст целиком)
Интервал:
Закладка:
if (pop_task_from_local_queue(task) || ←
(7)
pop_task_from_pool_queue (task) || ←
(8)
pop_task_from_other_thread_queue(task)) { ←
(9)
task();
} else {
std::this_thread::yield();
}
}
};
Этот код очень похож на код из листинга 9.6. Первое отличие состоит в том, что локальная очередь каждого потока — объект класса work_stealing_queue
, а не просто std::queue<>
(2). Новый поток не выделяет очередь для себя самостоятельно; это делает конструктор пула потоков (6), и он же сохраняет новую очередь в списке очередей для данного пула (1). Индекс очереди в списке передаётся функции потока и используется затем для получения указателя на очередь (3). Это означает, что пул потоков может получить доступ к очереди, когда пытается занять задачу для потока, которому нечего делать. Новая версия run_pending_task()
сначала пытается получить задачу из очереди исполняемого потока (7), затем из очереди пула (8)и, наконец, из очереди другого потока (9).
Функция pop_task_from_other_thread_queue()
(4)обходит очереди, принадлежащие всем потокам пула, пытаясь занять задачу у каждой. Чтобы не случилось так, что все потоки занимают задачи у первого потока в списке, каждый поток начинает просмотр с позиции, равной его собственному индексу (5).
Теперь у нас имеется пул потоков, пригодный для самых разных целей. Разумеется, есть масса способов улучшить его для работы в конкретной ситуации, но это я оставляю в качестве упражнения для читателя. В частности, мы совсем не исследовали идею динамического изменения размера пула, так чтобы обеспечить оптимальное использование процессоров, даже когда потоки блокированы в ожидании какого-то события, например, завершения ввода/вывода или освобождения мьютекса.
Следующим в нашем списке «продвинутых» приёмов управления потоками стоит прерывание потоков.
9.2. Прерывание потоков
Часто бывает необходимо сообщить долго работающему потоку о том, что пришло время остановиться. Например, потому что это рабочий поток пула, а мы собираемся уничтожить сам пул, или потому что пользователь отменил работу, выполняемую этим потоком. Причин миллион. Но идея в любом случае одна и та же: послать из одного потока другому сигнал с требованием прекратить работу до ее естественного завершения, и сделать это так, чтобы поток завершился корректно, а не просто выбить почву у него из-под ног.
Можно было бы придумывать такой механизм специально для каждого случая, но это, пожалуй, перебор. Мало того что общий механизм в дальнейшем упростит написание кода, так он еще и позволит писать код, допускающий прерывание, не заботясь о том, где конкретно он используется. Стандарт C++11 такого механизма не предоставляет, но реализовать его самостоятельно не слишком сложно. Я покажу, как это сделать, но сначала взгляну на проблему с точки зрения интерфейса запуска и прерывания потока, а не с точки зрения самого прерываемого потока.
9.2.1. Запуск и прерывание другого потока
Начнем с рассмотрения внешнего интерфейса. Что нам нужно от допускающего прерывание потока? На самом элементарном уровне интерфейс должен быть таким же, как у std::thread
, но с дополнительной функцией interrupt()
:
class interruptible_thread {
public:
template
interruptible_thread(FunctionType f);
void join();
void detach();
bool joinable() const;
void interrupt();
};
В реализации можно было бы использовать std::thread
для управления потоком и какую-то структуру данных для обработки прерывания. А как это выглядит с точки зрения самого потока? Как минимум, нужна возможность сказать: «Меня можно прерывать здесь», то есть нам требуется точка прерывания . Чтобы не передавать дополнительные данные, соответствующая функция должна вызываться без параметров: interruption_point()
. Отсюда следует, что относящаяся к прерываниям структура данных должна быть доступна через переменную типа thread_local
, которая устанавливается при запуске потока. Поэтому, когда поток обращается к функции interruption_point()
, та проверяет структуру данных для текущего исполняемого потока. С реализацией interruption_point()
мы познакомимся ниже.
Флаг типа thread_local
— основная причина, по которой мы не можем использовать для управления потоком просто класс std::thread
; память для него нужно выделить таким образом, чтобы к ней имел доступ как экземпляр interruptible_thread
, так и вновь запущенный поток. Для этого функцию, переданную конструктору, можно специальным образом обернуть перед тем, как передавать конструктору std::thread
. Как это делается, показано в следующем листинге.
Листинг 9.9.Простая реализация interruptible_thread
class interrupt_flag {
public:
void set();
bool is_set() const;
};
thread_local interrupt_flag this_thread_interrupt_flag; ←
(1)
class interruptible_thread {
std::thread internal_thread;
interrupt_flag* flag;
public:
template
interruptible_thread(FunctionType f) {
std::promise p; ←
(2)
internal_thread = std::thread([f,&p] { ←
(3)
p.set_value(&this_thread_interrupt_flag);
f(); ←
(4)
});
flag = p.get_future().get(); ←
(5)
}
void interrupt() {
if (flag) {
flag->set(); ←
(6)
}
}
};
Переданная функция f
обертывается лямбда-функцией (3), которая хранит копию f
и ссылку на локальный объект-обещание p
(2). Перед тем как вызывать переданную функцию (4), лямбда-функция устанавливает в качестве значения обещания адрес переменной this_thread_interrupt_flag
(объявленной с модификатором thread_local
(1)) в новом потоке. Затем вызывающий поток дожидается готовности будущего результата, ассоциированного с обещанием, и сохраняет этот результат в переменной-члене flag
(5). Отметим, что лямбда-функция исполняется в новом потоке и хранит висячую ссылку на локальную переменную p
, но ничего страшного в этом нет, так как конструктор interruptible_thread
ждет, пока на p
не останется ссылок в новом потоке, и только потом возвращает управление. Еще отметим, что эта реализация не обрабатывает присоединение или отсоединение потока. Мы сами должны позаботиться об очистке переменной flag
в случае выхода или отсоединения потока, чтобы избежать появления висячего указателя.
Теперь написать функцию interrupt()
несложно: имея указатель на флаг прерывания, мы знаем, какой поток прерывать, поэтому достаточно просто поднять этот флаг (6). Что делать дальше, решает сам прерываемый поток. О том, как принимается это решение, мы и поговорим ниже.
Интервал:
Закладка: