среда, 5 февраля 2020 г.

Мартин Клеппман. Высоконагруженные приложения (Designing Data-Intensive Applications)


Книга Мартина Клеппмана "Высоконагруженные приложения" (в оригинале Designing Data-Intensive Applications: The Big Ideas Behind Reliable, Scalable, and Maintainable Systems) может и должна стать настольной книгой современного разработчика программного обеспечения, который разрабатывает приложения, включающие какую-либо серверную/прикладную часть для хранения или обработки данных и использующие сеть для взаимодействия с другими компонентами системы.

Издание предназначено для разработчиков программного обеспечения, архитекторов ПО и технических директоров, которые в свободное время любят писать код. Книга особенно актуальна, если вам нужно принять решение об архитектуре систем, над которыми вы работаете, - например, выбрать инструменты для решения конкретной проблемы и придумать, как их лучше использовать. Но даже если у вас нет права выбирать инструменты, данная книга поможет понять их достоинства и недостатки.

В статье я приведу конспект книги. Я его написал, потому что при написании конспекта лучше усваивается материал. Надеюсь, что он кому-то поможет в принятии решения прочитать данную книгу. Текст получился достаточно объемный, но и полезного материала в книге масса.

Структура книги. Конспект

Книга является структурированным изложением идей, подходов и технологий, которые лежат в основе архитектур современных распределенных систем.

Основы информационных систем

В первой части обсуждаются базовые идеи, лежащие в основе проектирования Data Intensive Applications. Вначале обозначаются 3 основных нефункциональных требования, которые обычно предъявляются приложению: надежность, масштабируемость, удобство сопровождения - это отправная точка, от которой автор книги отталкивается и переходит к технологиям и паттернам проектирования, которые позволяют выполнить данные требования.

Погружение начинается с обзора моделей хранения данных и разновидностей баз данных, которые реализуют описанные модели. 
Исторически сначала данные представлялись в виде одного большого дерева (иерархическая модель), но такой вариант плохо подходил для связей "многие-ко-многим" и для решения этой проблемы была придумана реляционная модель.
Позже обнаружилось, что реляционная модель не слишком подходит для некоторых приложений - после чего стали появляться не реляционные хранилища данных NoSQL различных разновидностей: документоориентированные БД, графовые БД и т.д. Каждая модель данных более хорошо подходит для решения одних задач и не очень хорошо подходит для решения других, поэтому нужно понимать какое хранилище данных необходимо выбрать в зависимости от поставленной задачи.

Чтобы осуществить осознанный выбор хранилища, необходимо понимать, какие структуры данных и подходы использованы внутри, и как БД выстраивают данные на диске, чтобы их можно было эффективно находить. Этот вопрос раскрывается в главе 3 "Подсистемы хранения и извлечения данных", в которой рассматриваются 2 широкие категории подсистем хранения данных: 
  • Оптимизированных для обработки транзакций (OLTP).
    OLTP-системы обычно нацелены на работу с пользователями, и это означает огромное потенциальное количество запросов.
  • Оптимизированные для аналитики (OLAP).
    OLAP-системы обрабатывают меньшее количество запросов, чем OLTP-системы, но все запросы обычно ресурсоемки и требуют просмотра миллионов строк за короткое время.
Требования к OLTP и OLAP системам противоположны, следовательно для реализации каждой подзадачи потребуется настраивать хранилище по-своему.

Разобравшись с хранилищами данных, автор книги переходит к рассмотрению вопросов форматов кодирования данных (сериализации) и эволюции схем с течением времени. В главе 4 "Кодирование и эволюция" рассматриваются способы преобразования структур данных в байты в сети или на диске. Для современных сервисных архитектур необходима поддержка плавающих обновлений с постепенным развертыванием новой версии сервиса лишь на нескольких узлах за один раз, а не на всех узлах одновременно. В данной главе рассматриваются подходы к обеспечению прямой и обратной совместимости форматов данных. Обсуждаются несколько форматов кодирования, их свойства совместимости и удобства использования в долгосрочной перспективе:
  • Форматы, ориентированные на конкретный язык (например, сериализация объектов в java)
  • Текстовые форматы, подобные JSON, XML, CSV
  • Форматы, основанные на двоичных схемах, например, Thrift, Protocol Buffers и Avro
Обсуждается в данной главе и несколько возможных режимов движения потоков данных:
  • Запись и чтения данных разными потоками в одну БД.
  • RPC вызовы, REST API взаимодействие.
  • Асинхронная передача сообщений.

Распределенные данные

Во второй части книги автор переходит от данных, хранимых на одной машине, к данным распределенным по нескольким компьютерам. Распределить данные по нескольким узлам может потребоваться по нескольким поводам: 
  • Масштабируемость.
    Если объем данных, нагрузка по чтению или записи перерастают возможности одной машины.
  • Отказоустойчивость или высокая доступность.
    Если приложение должно продолжать работать даже в случае сбоя одной из машин. 
  • Снижение сетевой задержки.
    При наличии пользователей по всему миру необходимы серверы в разных точках земного шара, чтобы каждый пользователь обслуживался ЦОДом, географически расположенным максимально близко к нему.

Репликация

Повествование в этой части начинается с обсуждения организации репликации - хранения копий одних и тех же данных на нескольких машинах, соединенных сетью. Репликация может служить нескольким целям:
  • Высокая доступность. Сохранение работоспособности системы, в случае отказа одной или нескольких машин.
  • Работа в офлайн-режиме. Возможность продолжения работы приложения в случае прерывания соединения с сетью.
  • Задержка. Данные размещаются географически близко к пользователям.
  • Масштабирование. Возможность обрабатывать большие объемы операций чтения, чем способна обработать 1 машина.
В главе 5 "Репликация" обсуждается 3 основных метода репликации.
  • С одним ведущим узлом. Клиенты отправляют информацию по операциям записи одному ведущему узлу, который отправляет поток событий изменения данных ведомым узлам.
  • С несколькими ведущими узлами. Клиенты отправляют информацию о каждой из операций записи одному из нескольких ведущих узлов, могущих эту информацию принимать. Ведущие могут отправлять поток событий изменения данных друг другу и любым ведомым.
  • Без ведущего узла. Клиенты отправляют информацию о каждой из операций записи одному из нескольких узлов и читают из нескольких параллельно, чтобы обнаружить узлы с устаревшими данными и внести поправки.

Секционирование

В случае очень больших наборов данных или объемов обрабатываемой информации репликации недостаточно: появляется необходимость разбить данные на секции, иначе говоря, выполнить шардинг данных. Секционирование требуется, когда размер набора данных не позволяет хранить и обрабатывать его на одной машине. Основной целью секционирования является масштабируемость. Разные секции можно разместить в различных узлах в кластере. Каждый узел может независимо обрабатывать запросы в своей секции, так что пропускную способность по запросам можно масштабировать добавлением новых узлов, а сложные запросы перераспределить по нескольким узлам.
В главе 6 "Секционирование" рассмотрены 2 основных подхода:
  • Секционирование по диапазонам ключа. Ключи сортируются и секция содержит все ключи определенного диапазона.
  • Хэш-секционирование. От ключа вычисляется хэш-функция, которая определяет секцию.
Вторичные индексы в БД тесно связаны с секционированием: при секционировании данных индексы надо как-то тоже секционировать. Автор рассматривает основные подходы при секционировании индексов:
  • Секционирование по документам (локальные индексы). При записи, обновление индекса происходит в одной секции, а при чтении из индекса требуют фрагментированного чтения из нескольких секций.
  • Секционирование по термам (глобальные индексы). При записи, обновление индекса происходит в одной секции, а чтение из индекса можно выдать из одной секции.
Секционирование требует обычно координации кластера - для этого используется сервис координации, например, ZooKeeper.

Транзакции

В седьмой главе "Транзакции" говорится о достижении надежности систем. В реальности в работе приложения очень многое может пойти не так, например, ПО или аппаратное обеспечение базы данных может отказать в любой момент; разрывы сети могут неожиданно отрезать приложение от базы данных; несколько клиентов могут производить операции записи одновременно, перезаписывая изменения друг друга; клиент может прочитать данные, которые не имеют смысла, поскольку были обновлены частично; состояния гонки между клиентами могут привести к неожиданным ошибкам.

Надежная система должна уметь справляться со всеми этими сбоями и гарантировать, что они не приведут к внезапному и полному отказу системы. Транзакции в течении десятилетий считались предпочтительным механизмом решения этой проблемы. Транзакция - это способ группировки приложением нескольких операций записи и чтения в одну логическую единицу. Это означает, что все операции записи и чтения в рамках транзакции выполняются как одна: либо вся транзакция выполняется целиком успешно (с фиксацией изменений), либо целиком завершается неудачно (с прерыванием и откатом). Такая модель "все или ничего" сильно упрощает для приложения обработку ошибок, поскольку нет нужды заботиться  частичных отказах: когда часть операций завершилась успешно, а часть нет.

Может показаться, что использование транзакций - это очевидный шаг. Но транзакции не являются законом природы и были созданы лишь для упрощения модели работы с данными. Благодаря транзакциям приложение может игнорировать определенные сценарии ошибок и вопросов конкурентного доступа, поскольку этим занимается база данных. Широкий класс ошибок сводится к простому прерыванию транзакции, а приложению достаточно просто повторить выполнение транзакции. Но не для всякого приложения транзакции нужны. Обеспечение транзакционности процессов - непростая задача, транзакции имеют определенную стоимость и нужно понимать в конкретной ситуации где необходимо использовать транзакции, а где можно ими пожертвовать для производительности.

В главе рассматриваются примеры возможных проблем, алгоритмы, которые используют базы данных для их предотвращения. Все вопросы данной главы относятся к одноузловым базам данных. Раскрывается смысл аббревиатуры ACID (atomicity, consistency, isolation, durability). Большое внимание уделяется вопросу конкурентного доступа и широко рассмотрены используемые уровни изоляции, например, чтение зафиксированных данных (read commited), изоляцию снимков состояния (например MVCC), сериализуемость. Рассмотрены в главе характеристики уровней изоляции на разнообразных примерах состояния гонки.

  • "Грязные" операции чтения. Клиент читает записанные данные другим клиентом до их фиксации. Уровень изоляции чтения зафиксированных данных и более сильные предотвращают "грязные" операции чтения.
  • "Грязные" операции записи. Клиент перезаписывает данные, которые другой клиент записал, но еще не зафиксировал. Практически все реализации транзакций предотвращают "грязные" операции записи.
  • Ассиметрия чтения (невоспроизводимое чтение). Клиент видит различные части базы данных по состоянию на разные моменты времени. Чаще всего такую проблему предотвращают с помощью изоляции снимков состояния, при которой транзакция читает данные из согласованного снимка состояния, соответствующего определенному моменту времени. Обычно реализуется благодаря Multiversion Concurrency Control (MVCC).
  • Потерянные обновления. Два клиента выполняют в конкурентном режиме цикл чтения - изменения - записи. Один перезаписывает записанные другим данные без учета внесенных им изменений, так что данные оказываются потеряны. Некоторые реализации изоляции снимков состояния предотвращают эту аномалию автоматически, в других требуется установка блокировки вручную (SELECT FOR UPDATE).
  • Ассиметрия записи. Транзакция читает какие-либо данные, принимает на основе прочитанного значения решение о дальнейших действиях и выполняет операцию записи в базу данных. Однако на момент ее выполнения исходные условия, на основе которых принималось решение, более не соответствует действительности. Эту аномалию предотвращает только сериализуемость.
  • Фантомные чтения. Транзакция читает объекты, соответствующие определенному условию поиска. Другой клиент выполняет операцию записи, которая каким-то образом влияет на результаты этого поиска. Изоляция снимков состояния предотвращает непосредственно фантомные чтения, но фантомы в контексте ассиметрии записи требуют отдельной обработки, например блокировок по диапазону значения индекса.
При использовании слабых уровней изоляции некоторые из этих аномалий придется обрабатывать вручную в коде вашего приложения. Только изоляция уровня сериализуемых транзакций предотвращает все подобные проблемы. В главе также обсуждаются 3 различных подхода к реализации сериализуемых транзакций.
  • По-настоящему последовательное выполнение транзакций
  • Двухфазная блокировка 2PL (не путать с двухфазной фиксацией 2PC).
  • Сериализуемая изоляция снимков состояния (Serializable Snapshot Isolation SSI).

Проблемы распределенных систем

В главе 8 "Проблемы распределенных систем" говорится о том, что работа с распределенными системами принципиально отличается от написания ПО для отдельного компьютера - и главное отличие состоит во множестве новых захватывающих проблем. В главе обсуждается широкий спектр проблем, возникающих в распределенных системах.
  • Вероятность потери или задержки на произвольное время отправленного по сети пакета. Аналогично может потеряться или задержаться ответ, так что при его отсутствии вы не будете знать, дошло ли ваше сообщение до адресата.
  • Часы узла могут оказаться существенно рассинхронизированными с другими узлами (несмотря на все ваши старания по настройке NTP), неожиданно прескакивать вперед или назад во времени, и полагаться на них опасно, ведь вы не знаете интервала погрешности.
  • Выполнение процесса может быть приостановлено в любой момент на существенное время (например, вследствие паузы на сборку мусора), его могут объявить неработающим другие узлы, причем он способен потом возобновить работу, понятия не имея, что останавливался.
Возможность частичных отказов узлов становится определяющей характеристикой распределенной системы. Программное обеспечение должно учитывать вероятность отказа узла, и обеспечивать устойчивость к таким отказам.

Для обеспечения устойчивости к отказам необходимо сначала их обнаружить, но даже это нетривиальная задача. В большинстве систем нет безошибочных механизмов обнаружения сбоев узлов. Большинство распределенных алгоритмов определяет доступность удаленного узла по времени ожидания ответа. Однако, данный метод не позволят отличить сетевые отказы от отказов самого узла.

Даже когда сбой обнаружен, справиться с ним - непростая задача для системы: нет ни глобальных переменных, ни разделяемой памяти, ни общих знаний, ни какого-либо другого разделяемого всеми машинами состояния. Узлы не могут даже согласовать время, не говоря уже о чем-то более существенном. Единственный способ передачи информации между узлами - отправка по ненадежной сети. Один узел не может принять никаких серьезных решений, так что необходимы протоколы, вовлекающие другие узлы и ищущие кворум для согласования решения.

Если вы привыкли писать ПО в идеальном мире одного компьютера, где одна операция всегда детерминистически возвращает тот же самый результат, то переход к запутанной реальности распределенных систем может оказаться для вас шоком. Если вы в состоянии не открывать ящик Пандоры и держать все на одной машине, то определенно есть смысл это сделать. Однако,  для решения задач масштабируемости, отказоустойчивости и низкого времени ожидания (благодаря размещению данных географически близко к пользователю) обойтись одним узлом невозможно, и вам придется научиться решать описанные выше проблемы распределенных систем и научиться справляться с внутренними сбоями и обеспечивать надежную работу сервиса.

Согласованность и консенсус

В главе 9 "Согласованность и консенсус" рассматривается несколько примеров алгоритмов и протоколов для построения отказоустойчивых распределенных систем. При построении системы необходимо исходить из того, что в системе способны возникнуть все проблемы, представленные в предыдущей главе. Лучший способ построить отказоустойчивую систему -  создать некоторые абстракции с полезными гарантиями, реализовать их один раз, а затем позволить приложениям полагаться на эти гарантии. Данный подход аналогичен случаю с транзакциями: при использовании транзакций приложение может симулировать отсутствие сбоев, потому что транзакция предоставляет гарантии атомарности, консистентности, изоляции, долговечности (ACID). На самом деле сбои происходят, но абстракция транзакции скрывает эти проблемы так, что приложение может о них не беспокоиться.

Для распределенных систем важными абстракциями согласованности являются консенсус - согласованность между всеми узлами по какому-то вопросу, и линеаризуемость. Цель линеаризуемости состоит в том, чтобы реплицированные данные выглядели так, как будто существует только одна копия и все операции воздействует на нее атомарно. Модель линеаризуемости легко понять, тем не менее она приводит к тому, что база данных ведет себя как переменная в однопоточной программе - и ее недостаток заключается в замедлении работы системы, особенно в системах с большими системами задержки.

Данное ограничение можно ослабить, если научиться выделять причинно-следственные взаимосвязи между событиями в базе данных. В отличие от линеаризуемости, выстраивающей все операции в единую, полностью упорядоченную временную последовательность, причинность позволяет построить более слабую модель согласованности: отдельные события могут быть конкурентными, как в истории системы контроля версий с ее ветвлениями и слияниями. Причинно-следственный порядок можно зафиксировать с помощью временных меток Лампорта, однако все равно остаются системы, которые не могут быть реализованы таким образом. Например, при одновременной регистрации пользователя с одинаковым именем, один пользователь должен быть зарегистрирован, а второй должен получить ошибку. Данная проблема решается с помощью консенсуса.

Надежное обеспечение консенсуса, несмотря на сетевые сбои и отказы процессов - удивительно сложная задача. Достичь консенсуса означает принять решение о чем-то таким образом, чтобы с этим решением согласились все узлы и данное решение являлось неотменяемым. Автором утверждается, что весь широкий спектр проблем фактически сводится к консенсусу, они эквивалентны (в том смысле, что если есть решение для одной из них, то его можно легко превратить в таковое для остальных). К таким эквивалентным проблемам относятся следующие.

  • Линейные реестры сравнения с присвоением. Реестр должен атомарно принять решение, присваивать ли значение в зависимости от того, соответствуют ли его текущее значение параметру, указанному в операции.
  • Атомарная транзакция. База данных должна принять решение, следует ли завершать или отменить распределенную транзакцию.
  • Рассылка общей последовательности. Система обмена сообщениями должна принять решение о последовательности доставки сообщений.
  • Блокировки и аренда. Когда несколько клиентов ратуют за блокировку или ее отмену, блокировка принимает решение о том, какой из них выбрать.
  • Сервис членства и координации. Основываясь на детекторе отказа (например большое время задержки), система должна принять решение, какие узлы активны, а какие следует считать вышедшими из строя, потому что их сеансы были отключены.
  • Ограничение уникальности. Когда несколько транзакций конкурентно пытаются создать конфликтующие записи с одним и тем же ключом, ограничение должно принять решение, какую из них разрешить, а какие - отменить по причине нарушения ограничения.

Все описанное очень просто, если узел только один или в случае, когда только один узел имеет возможность принимать решения. Именно это происходит в базе данных с одним ведущим узлом: на него возложены полномочия по принятию решений. Поэтому такие базы данных могут предоставлять линеаризуемые операции, ограничения уникальности, полностью упорядоченный журнал репликации и др.

Однако, если этот единственный ведущий узел выходит из строя или становится недоступным вследствие разрыва сетевого соединения, то такая система становится неработоспособной. Существует 3 способа выхода из данной ситуации.

  1. Дождаться, пока ведущий узел восстановится, и согласиться с тем, что все это время система будет заблокирована. Многие координаторы XA/JTA-транзакций выбирают именно такой вариант. Данный метод не полностью решает задачу консенсуса, поскольку не удовлетворяет требованию завершения: если ведущий узел не восстановится, то система может остаться заблокированной навсегда.
  2. Решить проблему вручную - пускай человек выберет новый ведущий узел и перенастроит систему для его использования. Такой вариант применяется во многих реляционных БД. Это своего рода консенсус методом "вмешательства высшей силы": решение принимает человек-оператор, находящийся вне компьютерной системы. Скорость разрешения проблемы ограничена той скоростью, с которой могут действовать люди, - как правило, медленнее, чем компьютеры.
  3. Использовать алгоритм автоматического выбора нового ведущего узла. Такой вариант требует консенсусного алгоритма, и рекомендуется задействовать проверенный алгоритм, который правильно отрабатывает неблагоприятные сетевые условия. 

Несмотря на то, что база данных с одним ведущим узлом способна обеспечить линеаризуемость без использования консенсусного алгоритма для каждой записи, она по-прежнему требует консенсуса для поддержания приоритета выборов ведущего узла. Таким образом, в некотором смысле наличие этого узла - лишь полумера: консенсус по-прежнему необходим, только в другом месте и реже.

Хорошая новость - существуют отказоустойчивые консенсусные алгоритмы, такие как Viewstamped Replication (VSR), Paxos, Raft и Zab, и системы, такие как ZooKeeper и etcd.

Такие инструменты, как ZooKeeper, играют важную роль в обеспечении "аутсорсинга" консенсуса, обнаружении сбоев и обслуживании членства, которое могут задействовоать приложения. Данная система непроста в применении, но это намного лучше, чем пытаться разработать собственные алгоритмы для решения всех проблем, описанных в главе "Проблемы распределенных систем". Если однажды захотите решить одну из задач, которые сводятся к консенсусу, и сделать решение отказоустойчивым, то автор советует использовать что-то наподобие ZooKeeper.

Однако не каждая система обязательно требует консенсуса: например, системы репликации без ведущего узла и системы с несколькими такими узлами обычно не используют глобальный консенсус. Конфликты, возникающие в подобных системах, являются следствием отсутствия консенсуса между разными ведущими узлами. Но, может быть, это нормально: вероятно, нужно лишь научиться обходиться без линеаризуемости и лучше работать со структурами данных, история которых имеет ветвления с последующим слиянием версий.

На данной главе заканчивается большая теоретическая часть книги "Распределенные данные" и автор переходит к рассмотрению практических распределенных систем, состоящих из гетерогенных блоков.

Производные данные

Заложив теоретическую основу распределенных систем в предыдущих главах, автор переходит к рассмотрению интеграции нескольких разных информационных систем в одну согласованную архитектуру.

На практике информационные системы могут быть достаточно сложными. В большом приложении часто приходится получать и обрабатывать данные самыми разными способами, и ни одна база данных не способна удовлетворить всем этим требованиям. Поэтому в приложениях обычно используется сочетание нескольких информационных хранилищ, индексов, кэшей, аналитических систем и т.д. и реализованы механизму перемещения данных из одного хранилища в другое. Каждое такое хранилище может иметь различную модель хранения и быть оптимизировано для разных шаблонов доступа. Интеграция разнотипных систем является одной из самых важных задач, которую необходимо решить при построении нетривиального решения.

На высоком уровне системы хранения и обработки информации могут быть сгруппированы в две обширные категории.

  • Системы записи. Известны как источники правды. Система записи содержит надежную версию данных. Когда появляются новые данные, например, в результате пользовательского ввода, они сначала записываются сюда. Каждый факт представлен ровно один раз (представление обычно нормализуется). В случае каких-либо расхождений с другой системой правильным (по определению) считается значение в системе записи.
  • Производные информационные системы. Данные в такой системе являются результатом получения неких уже существующих данных из другой системы, которые каким-то образом были преобразованы или обработаны. В случае потери производные данные можно воссоздать из исходного источника. К этой категории относятся кэши, денормализованные значения, индексы, материализованные представления, прогнозируемые сводные данные.
Производные данные обычно избыточны и денормализованы. Для таких данных часто бывает важно обеспечить хорошую скорость выполнения запросов чтения. Можно создать несколько наборов данных на основе одного источника, и это позволит просматривать данные с разных точек зрения.

Обычно большинство баз данных и систем хранения не являются по своей сути не системами записи ни производными информационными системами. БД - это просто инструмент: важно как вы используете этот инструмент. Различие между системой записи и производной информационной системой зависит только от того, как вы применяете инструмент в своем приложении.

Автор выделяет 3 класса систем по способу построения:
  • Сервисы (онлайновые системы). Сервис ожидает запросов и инструкций, поступающих от клиента. После того как запрос получен, сервис пытается обработать его максимально быстро и возвращает отклик. Последний обычно является мерой производительности сервиса. Кроме того, очень важна доступность (если клиент не может получить доступ к сервису, то пользователь, скорее всего, получит сообщение об ошибке).
  • Системы пакетной обработки (автономные системы). Такая система принимает большое количество данных, запускает задачу для обработки и выдает данные на выход. Подобные задачи обычно выполняются довольно долго, так что пользователь, как правило, не ждет их окончания. Зато пакетные задачи часто запускаются по расписанию через определенные промежутки времени. Главной мерой производительности пакетной задачи обычно является пропускная способность (время, необходимое для обработки входного набора данных установленного размера).
  • Системы поточной обработки (почти реального времени). Поточная обработка представляет собой нечто среднее между онлайновой и пакетной системой. Как при пакетной обработке, эта система принимает данные на входе и генерирует выходные данные. Однако в случае появления событий поточные задачи реагируют на них, в то время как пакетные работают только с фиксированным набором данных. Благодаря этому отличию время ожидания у поточных систем меньше, чем у аналогичных пакетных. 

Пакетная обработка

В 10-ой главе рассматривается тема пакетной обработки. Философия пакетной обработки заложена в стандартных средствах Unix, таких как awk, grep и sort. Философия данных инструментов переносится в популярный ранее алгоритм MapReduce и в более современные системы потоковой обработки данных, такие как Spark, Tez и Flink.  Часть из описанных принципов проектирования заключается в том, что входные данные неизменяемы, а выходные предназначены служить входными для другой (пока неизвестной) программы. Для решения сложных задач создаются небольшие приложения, которые хорошо делают что-то одно, данные приложения (инструменты) соединяются в сложные конвейеры обработки данных с помощью специальных интерфейсов.

В мире Unix предусмотрен стандартный интерфейс, позволяющий соединять одну программу с другой и основанный на применении файлов и каналов; в MapReduce этот интерфейс является распредленной файловой системой (HDFS). Системы потовой обработки добавляют свои конвейероподобные механизмы передачи данных, чтобы избежать материализации промежуточных состояний в распределенной файловой системе, но исходные и окончательные выходные данные задачи этих систем по-прежнему обычно хранятся в HDFS.

Распределенные системы пакетной обработки данных решают две основные проблемы.

  • Секционирование. В MapReduce функции сопоставления размещаются в секциях в соответствии с входными файловыми блоками. Выходные данные переупорядочиваются, сортируются и объединяются в разделы сжатия, количество которых определяется в зависимости от задачи. Цель этого процесса - разместить все связанные данные, например все записи с одинаковым ключом, в одной секции.
    Системы потоковой обработки данных, созданные на базе MapReduce стараются избегать сортировки там, где это не требуется, но в остальном широко используют принцип секционирования.
  • Отказоустойчивость. MapReduce часто записывает данные на диск, что упрощает восстановление после неудачно совершенной операции без повторного выполнения всей задачи, но замедлят выполнение при отсутствии сбоев. Системы потоковой  обработки данных реже используют материализацию промежуточных состояний и держат больше данных в памяти. Это значит, что при выходе узла из строя им приходится выполнять больше повторных вычислений. Детерминированные операторы сокращают объем данных, которые необходимо пересчитывать.

В главе также обсуждаются несколько алгоритмов объединения данных в MapReduce, которые служат хорошей иллюстрацией того, как работают алгоритмы секционирования: объединение с сортировкой слияния, широковещетельное объединение по хэшу, секционированные хэш-объединения.

Распределенные системы пакетной обработки имеют преднамеренно ограниченную модель программирования: функции обратного вызова (callbacks) для сопоставления и сжатия, которые не меняют состояние и не имеют внешних видимых побочных эффектов (side effects), кроме генерируемых ими выходных данных. Это ограничение позволяет системе скрыть некоторые проблемы жестких распределенных систем за абстракцией: в случае сбоев и сетевых проблем задачи можно безопасно повторить, а выходные данные неудачно завершенных задач отбрасываются. Если несколько задач для какой-либо секции выполнены успешно, то только одна из них генерирует фактически видимые выходные данные.

Благодаря такой системе при разработке кода для пакетной обработки не приходится беспокоиться о механизмах отказоустойчивости: система сама гарантирует состояние конечного результата задачи таким же, как если бы никаких ошибок не произошло, хотя в действительности различные операции, возможно, пришлось повторить. Эти надежные семантические структуры намного надежнее, чем те, что обычно применяются в онлайн-сервисах, обрабатывающих пользовательские запросы, и имеют побочные результаты обработки запроса в виде информации, записанной в базу данных.

Характерная особенность задачи пакетной обработки такова: она считывает входные данные и генерирует выходные, не изменяя исходную информацию, - другими словами, выход отделен от входа. Крайне важно, что входные  данные ограничены: имеют известный фиксированный размер (например, состоят из набора файлов журнала, скопированных в определенный момент времени, или копии содержимого базы данных). Поскольку набор входных данных ограничен, задача знает, когда закончено его чтение, и завершается в этот момент.

Потоковая обработка

Потоковая обработка данных отличается от пакетной, рассмотренной в предыдущей главе, тем, что ее входные данные не ограничены. То есть у нас все еще есть задача, но на ее входе - бесконечные потоки информации.

На практике поток данных часто неограничен, информация приходит постепенно, с течением времени. Пользователи генерируют информацию сегодня, как и вчера, и продолжат делать это завтра. Процесс никогда не закончится, если только вы не решили закрыть бизнес. Пакетным процессорам приходится искусственно делить информацию на фиксированные временные промежутки: например,  в конце каждого дня обрабатывать данные, полученные за этот день, или в конце часа - данные поступившие за последний час. Проблема с ежедневными пакетными процессами заключается в том, что изменения входной информации отражаются на выходе только через день - для многих нетерпеливых пользователей это слишком долго. Поэтому возникает идея потоковой обработки: просто обрабатывать события по мере их возникновения.

Потоком называют данные, которые становятся доступными постепенно, с течением времени. Эта концепция находит самое разное применение: stdin и stdout Unix, в языках программирования Lazy списки, в API файловых систем (FileInputStream), в TCP-соединениях, технологиях доставки аудио- и видео содержимого через Интернет...

В главе рассматриваются потоки событий как механизм управления данными: аналог рассматриваемых в предыдущей главе пакетных данных, но только без ограничений и с возможностью поэтапной обработки. Для передачи потоков событий между узлами часто используются системы обмена сообщениями - брокеры сообщений. В главе рассмотрены два типа брокеров сообщений:

  • Классические брокеры типа AMQP/JMS. (Rabbit MQ, ActiveMQ, Artemis, Qpid, TIBCO EMS, IBM MQ...) Брокер отправляет сообщения потребителям, а те высылают подтверждение каждый раз, когда сообщение успешно обработано. После получения подтверждения сообщения удаляются из брокера сообщений.
  • Брокер на основе журналирования. (Apache Kafka, Amazon Kinesis Stream, Twitter DitributedLog, Google Cloud Pub/Sub) Брокер отправляет все сообщения из данного раздела одному узлу-потребителю и всегда в одном и том же порядке. Параллелизм достигается путем секционирования, а потребители отслеживают прогресс, проверяя смещение последнего обработанного сообщения. Брокер сохраняет сообщения на диске, поэтому при необходимости можно вернуться и прочитать старые сообщения заново. Несмотря на то, что эти брокеры записывают все сообщения на диск, они способны обеспечивать пропускную способность до миллионов сообщений в секунду за счет секционирования на нескольких машинах и высокую отказоустойчивость - благодаря репликации сообщений.
Применение журналов в потоковой обработке похоже на использование журналов репликации в базах данных и системы хранения информации на основе журналирования. 

В отношении потоков есть несколько возможностей: так, события активности пользователей, периодически считываемые показания датчиков и каналы данных (например, информация о рынках в финансовой сфере) естественно представить в виде потоков. Также полезно осуществлять запись в базу данных в виде потока: можно фиксировать историю всех изменений, сделанных в базе данных, - либо неявно, путем захвата данных об изменениях (change data capture CDC), либо явно, через источники событий. Уплотнение журнала позволяет потоку сохранять полную копию содержимого в БД.

Представление баз данных в виде потоков открывает широкие возможности для интеграции систем. Можно постоянно поддерживать актуальность производных информационных систем, таких как индексы поиска, кэши и аналитические системы, потребляя журнал изменений и применяя его к производной системе. Можно даже создавать новые представления для уже существующих данных, потребляя журнал изменений с самого начала вплоть до настоящего времени.

Средства для поддержания состояния в виде потоков и повторения сообщений легли в основу технологий,  которые позволяют объединять потоки и обеспечивают отказоустойчивость в различных системах потоковой обработки, в том числе поиск событий по шаблону, вычисление оконных агрегаций (анализ потоков) и поддержание производных информационных систем в актуальном состоянии (материализованные представления).

В главе рассмотрены трудности, возникающие при определении временных интервалов в потоковых процессорах, в том числе рассмотрены различия между временем обработки и временными метками событий. Рассмотрена проблема взаимодействия с событиями, которые появляются после того, как временное окно было закрыто.

Также в главе рассматриваются 3 типа объединения потоков:
  • Объединение "поток-поток". Оба входных потока состоят из событий активности. Оператор объединения ищет связанные события, произошедшие в течение некоего времени. Например, это могут быть два действия одного и того же пользователя, интервал между которыми не превышает 30 минут. Если нужно найти связанные события в одном и том же потоке, то оба набора входных данных объединения могут принадлежать одному потоку. 
  • Объединение "поток-таблица". Один входной поток состоит из событий активности,  а второй представляет собой журнал изменений базы данных. В журнале хранится локальная копия БД. Для каждого события активности оператор объединения делает запрос в базу данных и выводит событие активности, обогащенное информацией из БД.
  • Объединение "таблица-таблица". Оба входных потока являются изменениями базы данных. В этом случае каждое изменение из одной таблицы объединяется с последним состоянием из другой. Результатом является поток изменений, который передается в материализованное представление объединения двух таблиц.
В главе уделено внимание методам обеспечения отказоустойчивости и семантики "выполнение один раз" в потоковом процессоре. Как и в пакетной обработке, здесь нужно отбрасывать частичный вывод неудачно завершенных задач. Однако, поскольку выполнение потока длительное и результат выводится непрерывно, нельзя просто отказаться от всех выходных данных. Вместо этого применяется более дробный механизм восстановления, основанный на микропакетах (например в Spark Streaming и Apache Flink), контрольных точках, транзакциях и идемпотемтных операциях.

Будущее информационных систем

В заключительной главе книги обсуждаются новые (на момент написания книги) подходы к проектированию информационных систем, в которую включены личные мнения и размышления автора о будущем. Нет ни одного инструмента, который бы эффективно справлялся со всеми возможными вариантами использования, поэтому каждый раз для достижения своих целей необходимо комбинировать разные программные продукты. Проблему интеграции данных можно решить с помощью пакетной обработки и потоков событий, обеспечивая передачу данных между разными системами.

В предложенном в главе методе разделения систем одни системы были отнесены к системам записи, в то время как другие получают от них данные путем преобразований. Таким образом можно поддерживать индексы, материализованные представления, модели машинного обучения, статистические сводки и многое другое. Сделав эти преобразования и трансформации асинхронными и слабосвязанными, мы не даем локальным проблемам распространиться на другие части системы, тем самым повышая надежность и отказоустойчивость системы в целом.

Представление потоков данных как преобразований из одного набора данных в другой помогает развивать приложения: чтобы изменить один из этапов обработки (например, структуру индекса или кэша), достаточно просто запустить новый код преобразования всего набора входных данных и получить результат. Аналогичным образом, если что-то пойдет не так, можно исправить код и снова обработать данные с целью их восстановить.

Эти процессы очень похожи на те, что реализованы внутри баз данных, и, следовательно, мы переходим к идее приложений информационных потоков как разделению компонентов БД и построению приложения путем составления этих слабосвязных компонентов.

Производное состояние может быть обновлено с помощью отслеживания изменений в базовых данных. Более того, само производное состояние способно дополнительно отслеживаться потребителями, расположенными ниже по потоку. Можно даже довести этот поток данных до конечного пользователя, для которого отображаются данные, и таким образом создавать динамически обновляемые UI, чтобы отражать изменения данных и продолжать работать в автономном режиме.

В главе обсуждается, как сделать так, чтобы вся эта обработка оставалась корректной в случае сбоев. Надежные гарантии целостности могут быть реализованы с возможностью масштабирования при условии асинхронной обработки событий с помощью сквозных идентификаторов операций, которые позволяют сделать операции идемпотемтными, и путем асинхронной проверки ограничений. Клиенты могут либо дождаться проверки, либо продолжить не дожидаясь и потом принести извинения за нарушения ограничений (или выполнить компенсирующую операцию). Этот метод гораздо более масштабируемый и надежный, чем традиционный подход с использованием распределенных транзакций, и именно так на практике работают многие бизнес-процессы.

Структурируя приложения вокруг потока данных и асинхронно проверяя ограничения,  мы можем избежать избыточной координации и создавать системы, сохраняющие целостность, но при этом хорошо работающие даже в географически распределенных структурах при наличии сбоев. В главе затрагивается тема аудита для проверки целостности данных и обнаружения повреждений данных.

В заключении главы и книги автором рассматриваются эстетические аспекты построения высоконагруженных данными приложений. Данные можно применять для благих целей, но все же они способны причинить значительный ущерб: например, стать обоснованием для решений, серьезно влияющих на жизни людей. Использование данных и алгоритмов машинного обучения для принятия решений о выдаче кредита, о приеме на работу и т.п. получает все большее распространение в наши дни. Человек, которого какой-то алгоритм (оправданно или по ошибке) посчитал ненадежным, может столкнуться с большим количеством таких "нет". Систематический отказ от работы, авиапутешествий, предоставления страховки, аренды недвижимости, финансовых услуг и других ключевых аспектов деятельности является большим ограничением свободы личности, что его назвают "алгоритмической тюрьмой". В странах, где уважают права человека, система уголовного правосудия предполагает невиновность до тех пор, пока вина не будет доказана; однако автоматизированные системы могут систематически и произвольно исключать человека из участия в общественной жизни без каких-либо доказательств вины с малой вероятностью обжалования. Это приводит к дискриминации, слежке и раскрытию конфиденциальной информации. Из-за риска утечки данных использование данных в благих намерениях может иметь непредвиденные последствия.

Поскольку программное обеспечение и информация столь сильно влияют на окружающий мир, мы, разработчики, должны помнить, что несем ответственность за работу, направленную в мир, в котором хотим жить, - мир, относящийся к людям гуманно и уважительно. Я надеюсь что, что вместе мы достигнем этой цели.
Мартин Клеппман

Заключение

Практикующий инженер, который участвует в разработке распределенных приложений, которые общаются друг с другом по сети, обязательно найдет в книге Мартина Клеппмана что-то полезное. В каждой главе очень много ключевых слов и названия различных библиотек фреймворков, а в конце каждой главы есть огромный список литературы, в котором можно найти уйму интересного материала для более глубокого изучения темы. Данную книгу интересно читать с любого места и она заслужено может стать вашей настольной книгой.