Глава 10. Ввод/вывод и потоки данных: от блокировки к асинхронности и обратно.

Содержание.

Отличия от потока VCL и разработка интерфейса I/O (ввода/вывода).

Для рабочих потоков имеет смысл делать I/O блокирующим, так как обычно это самый простой путь. С точки зрения потока, использующего I/O ресурс c блокировкой, успех или неудача такого вызова проявляются немедленно, и при разработке логики программы не нужно заботиться о периоде времени между началом операции с I/O и ее завершением.

Выполнение главного потока VCL обычно нельзя блокировать на длинные периоды времени: поток должен всегда быть способен обработать новые сообщения с минимальной задержкой. Обычно дисковые I/O операции блокируют поток, поскольку задержки для конечного пользователя невелики, но другие I/O операции делают асинхронными, особенно действия по связи между потоками, процессами, или машинами, так как величину этих задержек нельзя предсказать заранее. Преимущество асинхронных операций, как уже говорилось прежде, состоит в том, что код, исполняемый потоком VCL, всегда способен воспринимать новые сообщения. Основной же недостаток - в том, что код, выполняющийся в потоке VCL, должен следить за статусом завершения всех незаконченных I/O операций. Это может быть довольно трудно, учитывая расходы на хранение возможно многочисленных описаний состояния. Иногда приходится создавать машину состояний, особенно при выполнении предопределенных протоколов, например, HTTP, FTP или NNTP. Чаще, конечно, проблема не настолько сложна, и должно быть решена разово. В таких случаях достаточно разработать конкретное решение.

При разработке набора функций передачи данных следует учитывать это различие . Если, например, рассмотреть взаимодействие, то самый общий набор операций для коммуникационного канала включает: Open, Close, Read и Write. Блокирующие I/O интерфейсы предоставляют простые функции для реализации этих действий. Асинхронные интерфейсы обеспечивают четыре основные функции, и вдобавок до четырех уведомлений (notifications), инициируемых либо call back (обратным вызовом), либо по событию (event). Эти уведомления или показывают, что завершилась начатая операция, или что возможно ее повторить, или и то и другое. Пример интерфейса может быть таким:

Обзор.

Для лучшего понимания этой главы будет полезно рассмотреть существующий механизм передачи данных между потоками, отметив методы, с помощью которых он будет расширен. Я хотел бы настоятельно рекомендовать читателям проработать эту главу, несмотря на то, что в ней много кода, который следует изучить. Наиболее важный момент, на который нужно обратить внимание - многие детали реализации, хотя и полезны для тех, кто хочет писать функциональные программы, включающие эти методы, но не имеют первостепенной важности для желающих приобрести фундаментальное понимание описанных вопросов. До сих пор единственный механизм передачи данных, который мы рассматривали - ограниченный буфер, который можно представить таким образом:

В этой главе будут продемонстрированы различные расширения этого буфера. Первая модификация будет довольно проста: установить два буфера комплементарно и добавить неблокирующую операцию извлечения с обоих сторон получившегося двунаправленного буфера.

Хорошо. Это не должно вызвать удивления у читателей, и тем, кто дошел до этого момента, прорабатывая весь материал, нетрудно будет реализовать такую конструкцию. Следующая модификация более существенная: вместо проведения операций чтения и записи с блокированием буфера, сделаем один набор действий асинхронным:
Точнее, мы создадим компонент, который преобразует блокирующие действия в асинхронные и наоборот. В данном случае он будет просто инкапсулировать операции чтения и записи для двунаправленного буфера, но при желании можно перекрыть эту функциональность, чтобы преобразовывать различные действия ввода/вывода между блокировочным и асинхронным режимами.
Появляется вопрос: Почему? Ответ должен быть очевиден: Если мы делаем буфер, обеспечивающий двунаправленное сообщение между двумя потоками, где один поток использует блокировку, а другой поток- асинхронные операции, то:

Реализация компонента преобразования блокировка-асинхронность.

Компонент, который мы создадим, предполагает, что выполняется только один поток VCL, так что асинхронный интерфейс будет предусмотрен только для одного потока. Операции блокировки, обеспечиваемые этим буфером, будут работать с точно теми же ограничениями, как и в примере ограниченного буфера в предыдущей главе, и следовательно, любое число блокирующих потоков будет способно параллельно иметь доступ к блокировочному интерфейсу. Подобно тому, как ограниченный буфер допускал простые операции Get и Put, вовлекающие только один элемент, для блокировочно-асинхронного буфера (в дальнейшем называемого БАБ), также допустимы простые действия, включающие только один элемент. Семантика интерфейса будет такой:

Добавление к ограниченному буферу операций просмотра.

Здесь модификация ограниченного буфера, реализующая операции просмотра. Обратите внимание, что хотя и возможно считывать счетчики семафоров во время определенных действий, я решил сохранять счетчики вручную, используя дополнительные переменные FEntryCountFree и FEntryCountUsed. Для чтения этих переменных предусмотрены новые методы. Многие программисты на Delphi могут сразу подумать об объявлении этих атрибутов буфера как свойств. К несчастью, мы должны учесть, что операции синхронизации, требующиеся для доступа к этим переменным могут потерпеть неудачу. Кажется более подходящим оставить операции просмотра в виде функций, таким образом напоминая программисту, что требуется некоторая работа по доступу к необходимым данным, и что функция может вернуть ошибку. Некоторые могут посчитать, что при таком обосновании будет иметь смысл также оформить атрибут буфера Size как явную функцию чтения. Это скорее вопрос стиля, так как размер буфера можно читать непосредственно без потребности в какой-либо синхронизации.

Создание двунаправленного ограниченного буфера.

Эта операция почти тривиальна и не требует сложного объяснения. Я осуществил ее как простую инкапсуляцию двух ограниченных буферных объектов. Все действия, поддерживаемые ограниченным буфером, также поддерживаются и двунаправленным буфером, с той небольшой модификацией, что поток, использующий этот объект, должен определить, с какой частью буфера он хочет иметь дело. Обычно один поток будет работать с частью A, а другой - с частью B. Здесь код. Этот класс реализует функциональность, описанную графически на вышеприведенной диаграмме, представляющей БАБ.

Детальное рассмотрение Блокировочно-Асинхронного Буфера (БАБ).

Сделав всю подготовительную работу, теперь можно рассказать о БАБ более подробно. Он содержит двунаправленный буфер и два потока, один чтения и один записи. Эти потоки выполняют действия записи и чтения с ограниченным буфером от имени потока VCL. Выполнение всех потоков можно представить графически с некоторыми отклонениями от существующих соглашений:

Эта диаграмма выглядит довольно устрашающе, так что, видимо, легче рассмотреть рабочий пример. Давайте возьмем тот случай, когда рабочий поток выполняет блокирующую запись в БАБ.

  1. Рабочий поток выполняет блокирующую запись.
  2. Поток чтения БАБ в данный момент блокирован при попытке прочесть из двунаправленного буфера. В результате записи он разблокируется и успешно проводит чтение.
  3. Он копирует прочитанные данные во вспомогательный буфер, локальный для класса потока, и генерирует событие прохождения данных, обрабатываемое БАБ.
  4. Код обработки события прохождения данных БАБ, исполняемый в контексте потока чтения, посылает сообщение дескриптору своего окна, свидетельствуя, что данные были успешно прочитаны потоком чтения.
  5. Поток чтения теперь ждет семафора, который сигнализирует о том, что данные прочитаны главным потоком VCL.
  6. Несколько позже главный поток VCL обрабатывает сообщения из очереди, предназначенные компоненту, точно таким же образом, как и для всех компонентов, имеющих дескриптор окна.
  7. Среди сообщений для компонента есть сообщение уведомления, посланное потоком чтения. Оно обрабатывается и генерирует событие OnRead для компонента.
  8. Событие OnRead обрабатывается логикой приложения (вероятно, главной формой), и в результате, очевидно, поток VCL пытается прочитать данные.
  9. Поток VCL вызывает метод БАБ AsyncRead.
  10. AsyncRead копирует данные из вспомогательного буфера в главный поток VCL. Затем он освобождает семафор, которым блокирован поток чтения, разрешая ему попытаться осуществить новую операцию чтения из двунаправленного буфера.
Точно так же БАБ работает и при записи. Она осуществляется асинхронно потоком VCL, внутреннй поток записи БАБ пробуждается и проводит блокирующую запись в двунаправленный буфер, и по окончании записи поток VCL уведомляется через событие, что можно пытаться делать новые операции записи.

В сущности интерфейс между блокировкой и асинхронной операцией через отправления сообщения идентичен тому, который был неформально введен в более ранних примерах. Отличие для этого компонента в том, что детали спрятаны от конечного пользователя, и проблема решена более формальным, хорошо определенным способом.

Здесь код этого компонента. Некоторые моменты стоит отметить особо. Обычно потомки TThread нечасто используют наследование, однако в данном случае потоки чтения и записи имеют много общего в своей функциональности, которая и реализована в базовом классе TBlockAsyncThread. Он содержит:

Базовый класс потока также обеспечивает необходимый минимум общей функциональности: создание потока,его уничтожение, и переключатель события для OnDataFlow. У базового класса есть два потомка: TBAWriterThread и TBAReaderThread. Они реализуют конкретные методы выполнения потока, а также предоставляют методы чтения и записи, которые косвенно будут исполняться потоком VCL. Сам компонент БАБ хранит двунаправленный буфер и два потока. Кроме того, в нем также хранится дескриптор окна FHWND, который используется для специализированной обработки сообщений.

Создание БАБ.

Давайте теперь кратко познакомимся с реализацией. При создании компонент БАБ получает дескриптор окна, используя AllocateHWnd. Эта полезная функция описана в книге Danny Thorpe "Создание компонентов Delphi". Компонент БАБ довольно необычен, потому что ему требуется дескриптор окна для выполнения обработки сообщения, но в действительности он - не визуальный компонент. Возможно дать компоненту БАБ дескриптор окна, сделав его потомком TWinControl. Тем не менее это неподходящий родитель для такого компонента, поскольку БАБ не является оконным элементом. Используя AllocateHWnd, компонент может осуществлять собственную обработку сообщений, не приобретая большого количества бесполезных для него методов. Обеспечивается также некоторое улучшение эффективности благодаря тому, что процедура обработки сообщений компонента выполняет только минимальную работу, имея дело лишь с одним сообщением, и игнорируя все другие.

При создании компонент БАБ также устанавливает обработчики событий от потоков себе же. Эти обработчики выполняются в контексте потоков чтения и записи и осуществляют отправление уведомления, которое связывает потоки чтения, записи, и основной поток VCL.

В результате создания компонента инициализируются потоки. Все действия, требуемые для этого - общие для потоков чтения и записи, так что они находятся в конструкторе TBlockAsyncThread. При этом просто устанавливается критическая секция, требующаяся для обеспечения атомарного доступа к промежуточному буферу в каждом потоке, а также создается семафор ожидания для каждого потока, который гарантирует, что рабочий поток будет ожидать поток VCL перед проведением операций чтения или записи.

Разрушение БАБ.

Уничтожение компонента несколько сложнее, но использует принципы, обсужденные в предыдущих главах. Двунаправленный буфер, содержащийся в БАБ, подобен ограниченному буферу, рассмотренному в последней главе, и его разрушение проходит в три стадии. Первый этап - разблокировка всех потоков, выполняющих действия по вводу/выводу в буфер путем вызова ResetState. Второй этап - ожидание, пока все потоки не остановятся, или по крайней мере, будут в том состоянии, в котором они не выполняют больше действий с буфером. Как только это условие выполнено, может начаться третий этап, на котором уничтожаются физические структуры данных.

Разрушение БАБ, таким образом, работает так:

Так как потоки внутренние для БАБ, у этих процедур очистки имеется тот положительный эффект, что деструктор БАБ может разблокировать и очистить все потоки и объекты синхронизации внутри компонента, а пользователю компонента не надо заботиться о потенциальных проблемах, присущих операциям очистки. Достаточно просто вызвать Free для БАБ. Это, очевидно, весьма желательно.

Несмотря не это, компонент все-таки дает доступ к методу ResetState. Причина в том, что компонент не контролирует другие рабочие потоки, которые могут проводить блокирующие операции с буфером. В подобных ситуациях главное приложение обязано само завершить рабочие потоки, сбросить состояние БАБ, и дождаться завершения рабочих потоков до физического разрушения БАБ.

Пример программы с использованием БАБ.

Вот еще один вариант программы для простых чисел. Главная форма запрашивает у пользователя два числа - начало и конец диапазона. Эти числа поступают в структуру запроса, а указатель на нее асинхронно записывается в БАБ. Несколько позже рабочий поток проводит блокирующее чтение и извлекает запрос. Затем он тратит какое-то (различное) время на обработку запроса, определяя, какие числа из этого диапазоне простые. По окончании обработки он выполняет блокирующую запись, передавая указатель на список строк, заполненный результатами. Основная форма оповещается, что в компоненте есть данные, для чтения, и тогда она читает список строк из БАБ и копирует результаты в Memo.

Стоит отметить два основных момента кода основной формы. Первый состоит в том, что интерфейс пользователя корректно обновляется в соответствии с управлением потоками данных для буфера. После того, как запрос будет подан, кнопка запроса будет запрещена. Она восстанавливается, только когда форма получает от БАБ событие OnWrite, указывающее, что можно безопасно записывать новые данные. Данная реализация устанавливает размер двунаправленного буфера 4. Это совсем немного, так что пользователь может убедиться, что после посылки четырех запросов, которые долго обрабатываются, кнопка остается запрещенной, пока один из запросов не обработан. Аналогично, если основная форма не может обработать уведомления о чтении из BAB достаточно быстро, рабочий поток будет заблокирован.

Второй момент - когда основная форма уничтожена, деструктор использует метод БАБ ResetState, как описано ранее, чтобы гарантировать корректную очистку потоков и освобождение буфера. Невозможность осуществления этого может привести к нарушению доступа (access violation).

Код рабочего потока довольно прост. Имеет смысл отметить, что поскольку он использует блокировку при чтении и записи, то использует процессор только при активной обработке запроса: если он не может получить запрос или послать ответ из-за перегрузки буфера, то он заблокирован.

Мы достигли нашей цели!

Небольшое резюме о том, чего мы добились с помощью этого компонента:

Читателя можно простить, если он думает, что все его проблемы позади...

Заметили утечку памяти?

Как в предыдущей, так и в этой главе мы уклонились от решения основной проблемы: элементы в созданных нами буферах не уничтожаются корректно при разрушении буфера. При первоначальном проектировании этих буферных структур, был принят подход, подобный используемому в TList: список или буфер только обеспечивает хранение и синхронизацию. Правильное распределение памяти для объектов и их освобождение - задача потока, использующего буфер.

Этот упрощенный метод создает существенные трудности. В общем случае чрезвычайно трудно проверить, что буфер обоих направлениях пустой перед его уничтожением. В вышеописанном примере, наиболее просто использующем буфер, есть четыре потока, четыре мьютекса или критические секции, и шесть семафоров для всей системы. Определение состояния всех потоков и обеспечение совершенно корректного выхода в такой ситуации, очевидно, невозможно.

В данном примере это было решено хранением счетчиков того, сколько запросов необслужены в каждый момент времени. Если мы получили столько же ответов, сколько было запросов, то мы можем быть уверены, что все буферы пусты.

Избавляемся от утечки.

Один подход - позволить разным буферам осуществлять callback-вызовы, которые во время очистки уничтожат различные объекты, содержащиеся в этих буферах. В общем случае это будет работать, но есть вероятность злоупотреблений, и реализация такой схемы, скорее всего, на практике получится неаккуратной.

Другая возможность - создать общую буферную схему управления, которая отслеживает конкретные типы объектов, храня информацию о том, когда они входят и выходят из разных буферов приложения. И опять же, реализация, вероятно, получится довольно сомнительной, и потребует довольно сложного механизм отслеживания ссылок, выполняя работу, которая должна быть в действительности проста.

Наилучшее решение - сделать буферные структуры аналогичными TObjectList, т.е. все элементы, помещаемые в буферы - экземпляры классов. Это позволит потоку, выполняющему операцию очистки, вызывать подходящий деструктор для каждого элемента в буфере. Даже более того - используя типы ссылок на класс, мы можем автоматически выполнять по время выполнения проверку типов объектов, проходящих через буфер, и создать безопасный по отношению к типу набор буферов.

Реализация подобной схемы оставим как упражнение читателю. Не требуется никаких изменений в основных механизмах синхронизации, но потребуют модификации процедуры для чтения и записи и реализация деструкторов для ограниченного буфера и классов потоков.

Проблемы просмотра.

При реализации двунаправленного буфера возможно обеспечить достаточно разумный механизм для просмотра буферов, чтобы увидеть, сколько в них элементов. Возможно, что при просмотре двунаправленного буфера счетчики свободных и использованных элементов не всегда будут точны, поскольку оба действия не могут выполняться атомарно. Тем не менее, гарантировано, что при только одном потоке чтения и записи в каждом направлении операции просмотра могут быть использованы как верный признак того, что действие будет успешным, без блокировки.

С асинхронным буфером проблема сложнее, потому что при данной реализации невозможно обеспечить гарантированно правильный контроль за состоянием буфера. Дело в том, что есть по существу два буфера в каждом направлении, ограниченный буфер и вспомогательный, для единственного элемента. Не предусмотрено никакого механизма для глобальной блокировки обоих буферов, чтобы можно было атомарно определять их статус.

В компоненте сделана попытка предоставить некоторые возможности просмотра, обеспечивая грубый подсчет элементов, проходящих через буферы, причем сделанно это преднамеренно, чтобы программист не заблуждался, думая, что результаты могут быть точны! Возможно ли сделать лучше?

Промежуточный буфер.

Наилучший путь улучшить ситуацию - полностью удалить промежуточный буфер. Это вполне возможно, если немного подумать, но требует переписывания всего кода буферизации. Мы должны сделать новый ограниченный буфер с несколько другой семантикой. Этот новый ограниченный буфер должен:

При таком способе можно использовать потоки чтения и записи, чтобы посылать уведомления, блокируя их, пока действие не будет возможным, а поток VCL мог бы выполнять фактические операции чтения и записывать в ограниченный буфер без блокировки.

С подобной семантикой унас будет только один набор буферов, которыми нужно управлять, и сравнительно легко обеспечить атомарное действие просмотра, которое даст точные результаты. Оставим реализацию читателю как упражнение...

Различные ограничения.

Для всех буферных структур, описанных в последних главах, считается, что программист посылает указатели на правильную память, а не NIL. Некоторые читатели могут обратить внимание, что часть кода потоков чтения и записи неявно подразумевает, что NIL не будет послан через буфер. Это несложно исправить с помощью введения флагов достоверности буфера , но за счет некоторого ухудшения кода.

Другое (скорее теоретическое) ограничение состоит в том, что конечный пользователь этого компонента может, в принципе, создать очень большое число буферов. Осеовные принципы программирования потоков для Win32 говорят, что обычно стоит ограничить количество потоков примерно до шестнадцати на приложение, что позволяет использовать восемь компонентов БАБ. Поскольку нет ограничений на число рабочих потоков, выполняющих операции блокировки для БАБ, кажется разумным иметь только один БАБ для приложения и использовать для связи между одним потоком VCL и всеми рабочими потоками. При этом, конечно, подразумевается, что все рабочие потоки выполняет одну и ту же работу. Чаще всего такой вариант вполне приемлем, так как большинство приложений Delphi должны порождать много потоков лишь при выполнении фоновых операций, требующих существенных затрат времени.

Обратная сторона монеты: Потоковые буферы.

Итак, все обсуждаемые буферные структуры использовали буферы указателей для передачи данных. Хотя это и полезно для дискретных операций, большинство I/O операций используют потоки данных. Все буферные структуры имеют приблизительный эквивалент, а именно, потоки данных, которые, в общем и целом, могут обрабатываться аналогичными способами. Есть несколько различий, которые имеет смысл отметить:

Есть еще много интересного, что можно было бы обсуждать по этой теме. Если читатель хочет увидеть рабочий пример потоковой буферизации, он может обратиться к коду в последней главе.

[Содержание] [Назад][Вперед]

© Martin Harvey 2000.
 

Hosted by uCoz