Глава 9. Семафоры. Управление потоками данных. Взаимосвязь источник-приемник.

Содержание:

Семафоры.

Семафор представляет собой другой тип примитива синхронизации, с несколько более широкими возможностями по сравнению с мьютексом. В наиболее простых случаях его можно использовать точно так же, как и мьютекс. В общем же семафоры позволяют реализовать в программе более продвинутые механизмы синхронизации.

Сначала давайте вспомним, как работают мьютексы. Мьютекс может быть или занят или свободен (signalled). Если он свободен, то действие ожидания мьютекса не блокируется. Если он занят, операция ожидания этого мьютекса блокирована. Если мьютекс занят, то он принадлежит конкретному потоку, и, следовательно, только один поток может обладать мьютексом в каждый момент времени.

Семафоры можно заставить действовать точно так же. Вместо понятия владения, захвата мьютекса, у семафора имеется счетчик. Когда этот счетчик больше нуля, семафор свободен, и операции ожидания для него не блокируются. Когда счетчик равен 0, то семафор занят, и операции ожидания заблокированы. Мьютекс по существу является разновидностью семафора, счетчик которого может быть только 0 или 1. Аналогично, семафоры можно рассматривать как воображаемые мьютексы, которые могут одновременно иметь более одного владельца. Функции Win32 API, работающие с семафорами, очень похожи на функции для работы с мьютексами.

Следующая таблица показывает, как превратить код, использующий мьютексы, в код с применением семафоров, и эквивалентные операции.
 
Мьютексы.
Семафоры.
MyMutex := CreateMutex(nil,FALSE,<name>); MySemaphore := CreateSemaphore(nil,1,1,<name>);
MyMutex := CreateMutex(nil,TRUE,<name>); MySemaphore := CreateSemaphore(nil,0,1,<name>);
WaitForSingleObject(MyMutex,INFINITE); WaitForSingleObject(MySemaphore,INFINITE);
ReleaseMutex(MyMutex); ReleaseSemaphore(MySemaphore,1);
CloseHandle(MyMutex); CloseHandle(MySemaphore);

Вот простой пример: здесь изменения, требующиеся для кода, представленного в 6 Главе, чтобы программа использовала семафоры вместо критических секций.

Счетчик больше единицы? "Не вполне критические" секции.

Возможность семафоров иметь счетчик более единицы отчасти аналогична разрешению мьютексу иметь более одного владельца. Таким образом семафоры позволяют создавать критические секции, который разрешают определенному количеству потоков иметь доступ к одному конкретному участку кода или к конкретному объекту. Это по большей части полезно в ситуациях, когда коллективный ресурс состоит из множества буферов или множества потоков, которые можно использовать и другими потоками системы. Давайте рассмотрим конкретный пример и предположим, что до трех потоков могут работать с определенным участком кода. Семафор создается с начальным и максимальным счетчиком 3, и предположим, что ни один поток не работает с критическим участком. Выполнение пяти потоков, пытающихся получить доступ к коллективному ресурсу, может выглядеть приблизительно так:

Это конкретное применение семафоров, вероятно, не особенно полезно для программистов на Delphi, главным образом потому, что есть несколько подобных статических структур для уровня приложения. Тем не менее, оно оказывается значительно более важным для ОС, где дескрипторы или ресурсы, такие как системные буферы, вероятно будут статически распределены во время загрузки.

Новое применение семафоров: управление потоками данных.

В Главе 6 было указано на потребность в управлении потоками данных при их прохождении между программными потоками . Кроме того, в Главе 8 эта тема была затронута при обсуждении мониторов. В данной главе рассматривается ситуация для примера, где часто требуется управление потоками данных: ограниченный буфер с единственным потоком-поставщиком данных, выводящий некоторые элементы в буфер, и единственный поток-потребитель, забирающий их оттуда.

Ограниченный буфер.

Ограниченный буфер представляет собой простую разделяемую структуру данных, которая обеспечивает и управление потоками данных, и общий доступ к данным. Буфер, рассмотренный здесь, будет простой очередью: первым вошел - первым вышел (FIFO). . Это будет реализовано в виде циклического буфера, то есть содержать фиксированное количество элементов и иметь два указателя "get" и "put", показывающие, в каком именно месте буфера данные будут вставлены и удалены. Обычно разрешается четыре операции с буфером:

Очевидно, для обращения с коллективными данными потребуются мьютексы. Тем не менее, мы можем использовать семафоры для выполнения необходимых операции блокировки, когда буфер полон или пуст, устраняя потребность в контроле выхода за границы, и даже подсчете того, сколько элементов находится в буфере. Для того, чтобы это сделать, потребуется некоторое изменение концепций. Вместо ожидания семафора и затем освобождения его при выполнении операций, имеющих отношение к буферу, мы используем счетчик двух семафоров, чтобы следить за тем, сколько входов в буфере пусты или заполнены. Давайте назовем эти семафоры "EntriesFree" и "EntriesUsed".

Обычно с буфером взаимодействуют два потока. Поток-производитель (или писатель) пытается вставить данные в буфер, а поток-потребитель (читатель) пытается их извлечь, как показано на следующей диаграмме. Третий поток (возможно, поток VCL), может вмешиваться для того, чтобы создавать и уничтожать буфер.

Как можно видеть, потоки - читатель и писатель - выполняются в цикле. Поток-писатель создает элемент и пытается поместить его в буфер. Сначала поток ожидает семафора EntriesFree. Если счетчик EntriesFree нулевой, то поток будет заблокирован, так как буфер полный, и больше данных добавить нельзя. Как только это возможное ожидание закончится, поток добавляет элемент в буфер, а затем увеличивает счетчик EntriesUsed, и, если необходимо, активирует поток-потребитель. Соответственно поток-потребитель заблокируется, если счетчик EntriesUsed нулевой, а когда он удаляет элемент, то увеличивает счетчик EntriesFree, разрешая потоку-производителю добавлять новый элемент.

Блокировка нужного потока всякий раз, когда буфер становится полным или пустым, оставляет один или другой поток "вне игры". При данном размере буфера N, поток-производитель может только быть на N элементов впереди потока-потребителя до своей остановки, и аналогично, поток-потребитель не может отставать более, чем на N элементов. Это дает несколько преимуществ:

Для полной ясности я дам пример последовательности событий. У нас есть буфер, который имеет 4 возможных входа, и инициализируется он так, что все элементы свободны. Возможно много путей выполнения в зависимости от работы планировщика, но я проиллюстрирую тот, где каждый поток выполняется столько, сколько возможно, прежде чем он будет приостановлен.
 

Действия потока-читателя
Действия потока-писателя
Число свободных элементов
Число занятых элементов
Thread starts
Thread inactive (not scheduled)
4
0
Wait(EntriesUsed) blocks. Suspended.
 
4
0
 
Wait(EntriesFree) flows through
3
0
 
Item Added. Signal(EntriesUsed)
3
1
 
Wait(EntriesFree) flows through
2
1
 
Item Added. Signal(EntriesUsed)
2
2
 
Wait(EntriesFree) flows through
1
2
 
Item Added. Signal(EntriesUsed)
1
3
 
Wait(EntriesFree) flows through
0
3
 
Item Added. Signal(EntriesUsed)
0
4
 
Wait(EntriesFree) blocks. Suspended
0
4
Wait(EntriesUsed) completes
 
0
3
Item Removed. Signal(EntriesFree)
 
1
3
Wait(EntriesUsed) flows through
 
1
2
Item Removed. Signal(EntriesFree)
 
2
2
Wait(EntriesUsed) flows through
 
2
1
Item Removed. Signal(EntriesFree)
 
3
1
Wait(EntriesUsed) flows through
 
3
0
Item Removed. Signal(EntriesFree)
 
4
0
Wait(EntriesUsed) blocks. Suspended
 
4
0

Реализация ограниченного буфера в Delphi.

Здесь первая реализация ограниченного буфера на Delphi. Как обычно, появляется несколько вопросов, на которые следует обратить внимание, и несколько проблем, которые будут решены позже.

Создание: Корректная инициализация счетчиков семафоров.

При такой реализации ограниченного буфера данные хранятся как массив указателей с индексами чтения и записи в этот массив. В целях отладки я сделал так, что если буфер содержит N элементов, то он будет объявлен полным, когда заполнено N-1элементов. Такая задача чаще всего решается с помощью циклического буфера, где индексы чтения и записи сравнивают для определения, полон буфер или нет. Если буфер пуст, индексы чтения и записи одинаковы. К несчастью, то же самое будет и для случая, если буфер совершенно заполнен, так что часто в коде циклического буфера делают один всегда пустой вход, что позволяет различить эти два условия. В нашем случае, поскольку мы используем семафоры, это не обязательно. Тем не менее, я решил соблюсти это соглашение для облегчения отладки.

Учитывая это, мы можем инициализировать семафор EntriesUsed нулем. Поскольку заполненных элементов нет, мы хотим, чтобы потоки-читатели сразу же были блокированы. По условию, мы хотим, чтобы потоки-писатели добавили в буферN-1 элементов, и поэтому инициализируем EntriesFree значением N-1.

Нам также нужно учитывать максимальный счетчик, разрешенный для семафоров. Процедура, которая уничтожает буфер, всегда выполняет действие SIGNAL (свободны) для обоих семафоров. Поэтому, когда буфер разрушается, в нем может находиться любое количество элементов, т.е. он может быть и полностью заполнен и совершенно пуст, и мы установили максимальный счетчик в N, допуская таким образом одно действие освобождения семафоров для всех возможных состояний буфера.

Работа: правильные времена ожидания.

Я использовал мьютексы вместо критических секций в этой части программы, поскольку они позволяют разработчику более точно контролировать ошибочные ситуации. Кроме того, они также допускают выход по таймауту. Время ожидания для wait-функций семафоров в самом деле должно быть бесконечным; возможно, что буфер остается полным или пустым в течение длинных периодов времени, и нам нужно заблокировать поток на столько, сколько буфер будет пуст. Параноидально настроенные или пишущие небезопасный код программисты могут использовать задержки на нескольких секунд для этих примитивов, чтобы учесть непредвиденные ошибочные ситуации, когда поток блокируется навсегда. Я достаточно уверен в своем коде, чтобы считать это необязательным, по крайней мере в данном случае...

Задержка для мьютекса - совсем другое дело. Операции в критической секции быстрые; до N записей в память, и, если обеспечить сравнительное небольшое N (то есть меньше миллиона), то эти действия не должны занять более 5 секунд. В качестве бесплатного приложения часть кода очистки захватывает этот мьютекс, а вместо освобождения его - закрывает дескриптор. При установке таймаута гарантируется, что потоки, ожидающие мьютекс, будут разблокированы, и вернут код ошибки.

Разрушение: Очистка.

К настоящему моменту большинство читателей уже понимают, что операции очистки часто являются наиболее трудными в многопоточном программировании. Ограниченный буфер не является исключением. Процедура ResetState выполняет очистку. Первое, что она делает - проверяет FBufInit. Я предположил, что при этом не требуется синхронизировать доступ, поскольку поток, который создает буфер, должен также и уничтожить его, а раз все делается одним потоком, и все операции записи происходят в критической секции (по крайней мере после создания), то никаких конфликтов не произойдет. Процедуре очистки теперь нужно проверить, что все правильно уничтожено, и что все потоки, находящиеся в ожидании, в процессе чтения или записи, завершаются корректно, сообщая в противном случае о неудаче.

Процедура очистки сначала захватывает мьютекс для разделяемых данных в буфере, затем разблокирует потоки чтения и записи, освобождая оба семафора. Операции производятся в этом порядке, поскольку, когда оба семафора свободны, состояние буфера больше не отражает истинного положения дел: счетчики семафоров не согласованы с содержимым буфера. Захватывая сначала мьютекс, мы можем уничтожить буфер раньше того, как разблокированные потоки приступят к его чтению. Уничтожая буфер, и устанавливая FBufInit в False, мы гарантируем, что разблокированные потоки вернут код ошибки, а не будут обращаться к неправильным данным.

Затем мы разблокируем оба потока, освободив оба семафора, а потом закрываем все дескрипторы синхронизации. После этого уничтожаем мьютекс, не освобождая его. Это не страшно, поскольку все действия ожидания мьютекса закончились, то мы можем быть уверены, что как поток чтения, так и поток записи в конечном счете разблокируются. Кроме того, так как имеется только по одному потоку - читателю и писателю, мы можем гарантировать, что никакие другие потоки в течение этого процесса не станут ожидать освобождения семафоров. Это означает, что одного действия по освобождению обоих семафоров было достаточно, чтобы возобновить все потоки, а поскольку мы уничтожаем дескрипторы семафоров, пока удерживаем мьютекс, то дальнейшие действия по записи или чтению обречены на неудачу, если при них будет попытка обращения к одному из семафоров.

Разрушение: тонкости остаются.

Этот код гарантированно работает только с одним потоком чтения, с одним потоком записи и одним управляющим. Почему?
Если существует более одного потока чтения или записи, то более, чем один поток может ожидать один из семафоров в любом момент времени. Следовательно, мы не могли бы активировать все ожидающие потоки-читатели или писатели, когда состояние буфера сброшено. Первой реакцией программиста на это может быть модификация подпрограммы очистки, чтобы продолжалось освобождение одного или другого семафора, пока все потоки не будут разблокированы, что можно сделать приблизительно так.  К несчастью, этого все еще недостаточно, поскольку один из повторяющихся циклов в процедуре очистки может завершиться прямо перед тем, как другой поток приступит к действиям чтения или записи и станет ожидать семафор. Очевидно, нам нужно нечто атомарное, но мы не можем ввести действия с семафором в критическую секцию, поскольку потоки, блокирующие семафор, останутся в критической секции, и вся потоки придут к тупику.

Доступ к дескрипторам синхронизации должен быть синхронизован!

Следующая возможность - обнулить дескриптор семафора прямо перед "отключением" его, сделав нечто подобное. (Автор будет честен и признает, что это жалкое неполноценное решение пришло ему в голову). Однако это ничем не лучше. Вместо проблемы тупика мы получим конфликт потоков непростого типа. Этот конфликт представляет собой запись после чтения для самого дескриптора семафора! Да... Вы должны синхронизировать даже ваши объекты синхронизации! Вот что может случиться: рабочий поток читает значение дескриптора мьютекса из буферного объекта и приостанавливается, ожидая; в этот момент поток очистки, уничтожающий буфер, освобождает мьютекс необходимое число раз, и именно в этот момент рабочий поток возобновляется и обращается к мьютексу, который, как мы считаем, только что был уничтожен! Интервал, в котором это может случиться, очень небольшой, но тем не менее, это решение неприемлемо.

Управление дескрипторами в Win32.

Эта проблема достаточно сложна, так что имеет смысл рассмотреть что именно происходит, когда мы запускаем функцию Win32 для закрытия мьютекса или семафора. В частности, полезное знать вот что:

Чтобы узнать это, мы можем использовать два текстовых приложения,  для мьютекса, и для семафора. С помощью этих программ можно определить, что при закрытии дескриптора объекта синхронизации Win32 не разблокирует потоки, ожидающие этот объект. Это, наиболее вероятно, происходит благодаря механизму подсчета ссылок, который Win32 использует, чтобы следить за дескрипторами: потоки, ожидающие объект синхронизации, могут поддерживать внутренний счетчик ссылок так, чтобы он не обнулялся, и закрывая дескриптор объекта для приложения, мы только лишаемся всякого управления этим объектом синхронизации. В нашей ситуации это серьезная проблема. В идеале при очистке хотелось бы надеяться, что попытка ожидания для закрытого дескриптора должна разблокировать потоки, ждущие освобождения данного объекта синхронизации через этот конкретный дескриптор. Это бы позволило программисту войти в критическую секцию, очистить данные в этой критической секции, затем закрыть дескриптор, таким образом разблокировав потоки, ожидающие данный объект с неким значением ошибки (возможно, WAIT_ABANDONED? (ждать, пока не исчезнет)).

Решение.

В результате всего этого мы определили, что закрытие дескрипторов необходимо, оно приводит к тому,что поток не будет выполнять ожидание дескриптора неопределенное время. Применяя это к ограниченному буферу при очистке, мы можем гарантированно разблокировать все потоки, ожидающие семафора, только если мы знаем, сколько потоков ожидают освобождения мьютексов. В общем случае нам нужно проверять, что потоки не выполняет бесконечного ожидания мьютексов. Здесь измененный буфер, который работает с любым количеством потоков. Функции ожидания семафоров в нем были модифицированы, процедура очистки также претерпела некоторые изменения.

Вместо выполнения бесконечного ожидания соответствующего мьютекса, потоки чтения и записи теперь вызывают функцию "Управляемого Ожидания". В этой функции каждый поток ждет семафор только определенное временя. Такое ожидание семафора может возвращать одно из трех значений, в соответствии с описанием в справке Win32.

Во-первых, если семафор освобожден, функция возвращает "успех", и никаких дальнейших действий не требуется... Во-вторых, в случае, когда функция Win32 WaitFor дает WAIT_ABANDONED, функция возвращает ошибку; это значение ошибки указывает, что поток завершается без корректного освобождения объекта синхронизации. Случай, который нам наиболее интересен - время ожидания истекло (WAIT_TIMEOUT). Это может быть по одной из двух возможных причин:

Для того, чтобы это проверить, мы пытаемся войти в критическую секцию и проверить, что переменная "буфер инициализирован" все еще True. Если любое из этих действий терпит неудачу, то мы знаем, что внутреннее содержимое буфера было разрушено, и функция завершается, возвратив ошибку. Если оба этих действия успешны, то мы возвращаемся в цикл, снова ожидая мьютекс. Эта функция гарантирует, что когда буфер разрушен, заблокированный поток в конечном счете выйдет по таймауту, и возвратит ошибку в вызвавший поток.

Подпрограмма очистки также немного модифицирована. Она теперь и освобождает семафоры, и мьютекс критической секции. Такой подход гарантирует, что первый поток-читатель и писатель разблокируются немедленно, как только состояние буфера будет сброшено. Конечно, дополнительным потокам, возможно, придется ждать завершения вплоть до истечения задержки, определенной по умолчанию.

Использование ограниченного буфера: пример.

Чтобы создать основу этого примера, было разработано простое приложение с использованием двух потоков. Эта программа ищет простые числа-палиндромы (перевертыши). Пара палиндромов существует, когда два числа, X и Y оба простые, и Y является палиндромом X. Ни X , ни Y не должныбыть палиндромами сами по себе, хотя это предсталяет собой особый случай X = Y. Примеры простых чисел - палиндромов включают: (101, 101), (131, 131) - это особый случай, а (16127, 72161) , (15737, 73751) и (15683, 38651) - не особый.

В сущности, два потока (исходный код) осуществляют слегка разные задачи. Первый поток ("прямой") ищет простые числа. При нахождении он помещает это число в ограниченный буфер. Второй поток ждет, пока в буфере будут элементы. Когда они появляются, он удаляет элемент, переворачивает цифры, проверяет, является ли обращенное число простым, и если это так, то посылает текстовую строку, содержащую два числа, главной форме (код формы).

Хотя кода довольно много, нового для обсуждения почти нет. Я советую читателю рассмотреть методы Execute каждого потока, так как они дают ясное представление о том, что происходит. Перенос данных из второго потока в поток VCL и соответствующие методы главной формы обсуждались в предыдущих главах. Остался только один момент, который стоит затронуть... вы, наверно, догадались! Освобождение ресурсов и очистка.

В завершение...

Вы наверно, подумали, не могло ли что-нибудь еще остаться недосказанным про разрушение? Надо упомянуть еще один тонкий момент. Код ограниченного буфера допускает, что потоки могут попытаться получить доступ к полям объекта после сброса буфера. Это хорошо, но это означает, что, уничтожая два потока и буфер между ними, мы должны сбросить состояние буфера, затем подождать завершения всех потоков, и только потом действительно разрушать буфер, освобождая таким образом память, содержащую сам объект. Если так не сделать, может произойти нарушение доступа (access violation). Функция StopThreads правильно это выполняет, гарантируя корректный выход.

В данный момент стоит упомянуть, что дополнительные проблемы с синхронизацией имеются для процедуры SetSize. В примере я считал, что размер буфера установлен единожды и навсегда, причем раньше, чем какой-либо поток станет использовать буфер. Возможно установить размер буфера и тогда, когда он используется. В общем, это плохая идея, поскольку означает, что если два потока используют буфер; один читатель и один писатель, то они не могут правильно обнаружить разрушение буфера. Если у буфера должен быть изменен размер, то все потоки, использующие буфер, должны быть остановлены или приостановлены в известной безопасной точке. Затем у буфера можно поменять размер, и перезапустить потоки записи и чтения. Некоторые программисты могут захотеть написать расширенную версию буфера, который корректно обрабатывает операции изменения размера.


[Содержание] [Вперед][Назад]

© Martin Harvey 2000.
 

Hosted by uCoz