Брокеры сообщений — это приложения-посредники, используемые в различных видах архитектуры программного обеспечения, например, в микросервисах. Брокеры пересылают информацию в виде сообщений от сервиса к сервису.
С брокерами сообщений тесно связан паттерн «publisher-subscriber», суть которого заключается в передаче некоторых событий от так называемых «издателей» (Producers) к потребителям (Consumers).
Чаще всего в реализации этого паттерна участвует промежуточное звено — тот самый брокер сообщений.
Для простоты можно провести аналогию. Представим YouTube: там есть каналы, у которых можно подписаться на уведомления. Далее происходит событие — публикация нового видео. Мы подписаны на это событие и получаем уведомление о том, что видео вышло.
VDS и VPS
биллингом по всему миру: Россия, Азия и Европа.
Как работают приложения с брокерами сообщений?
Тут нас будут интересовать три главных термина: Exchange, Queue, Binding.
Exchange
Сюда попадают сообщения-события. Exchange решает, в какую очередь какие события попадут. Это происходит на основе связей — байндингов, о которых будет сказано далее. На схеме отметим Exchange вот таким способом:
Queue
Или просто очередь. Структура данных, получение которых построено по принципу «первый вошёл — первый вышел», или FIFO. Данные хранятся на диске или в ОЗУ и представляют непосредственно сообщения. Очередь отдает копии этих данных потребителям (consumers). Схематично изобразим:
Binding
Переводится как «привязка». Это набор правил, которые указывают Exchange, в какую из очередей должны попадать сообщения. Между Exchange и Queue может быть несколько привязок с разными параметрами.
Принцип работы
Producer — это какой-то сервис, который генерирует сообщения и посылает их брокеру. Consumer — это другой сервис, который при получении сообщения начинает его обработку. В случае той же аналогии с Ютубом, «издатель» — это сам YouTube, а «потребитель» — это ваш телефон, получающий пуш-уведомления
-
Producer отправляет сообщение, и оно попадает в Exchange.
-
Очередь хранит это сообщение — например, на диске.
-
Когда consumer готов принять сообщение (закончил обработку предыдущего или просто приложение-обработчик событий запустилось) — сервер посылает в него скопированные данные из очереди.
-
Consumer получает сообщение, затем как-то обрабатывает его и передаёт брокеру подтверждение (ACK).
-
После получения брокером подтверждения сообщение удаляется.
RabbitMQ
Это популярный open-source брокер сообщений. Он базируется на основе протокола AMQP (Advanced Message Queuing Protocol) — это открытый протокол для передачи сообщений-событий через специальный брокер. Сам протокол имеет очень много возможностей, и помимо RabbitMQ его реализует, например, Apache Qpid.
Сам «кролик» разработан на языке программирования Erlang, а его основные преимущества - пропускная способность и максимальная гибкость маршрутизации. Для «кролика» существует множество вариантов настройки правил того, какие сообщения и куда будут попадать на обработку.
Подключаемся к Рэббиту мы по протоколу TCP: клиент инициализирует подключение по адресу хоста и сохраняет соединение до тех пор, пока ему необходимо взаимодействовать с RabbitMQ. Он также поддерживает аутентификацию, например, по логину и паролю
Установка на разные ОС
RabbitMQ состоит из двух частей: сервер и WebUI (админка). В UI можно смотреть, что, собственно, происходит в нашем брокере: живы ли узлы в кластере, сколько сообщений в процессе обработки и т.д. Давайте разберёмся, как это всё устанавливать на разные ОС.
Для установки Rabbit на Ubuntu или Debian есть готовая инструкция на официальном сайте. Можно просто выполнить следующий скрипт в командной строке. Разберём его подробнее:
-
Установка стандартных зависимостей
-
Добавляем ключи репозиториев
- После выполнения скрипта нам необходимо включить UI. Выполним следующие команды:
-
Добавим пользователя, под которым мы будем заходить:
-
И выдадим ему все права:
Теперь мы можем зайти на адрес <ваш-IP>:15672, ввести логин и пароль и увидеть админку.
Обзор RabbitMQ Management
Разберём основные возможности RabbitMQ и его панели администрирования. На главной странице можно посмотреть общую информацию: узлы, их состояние, общее количество сообщений и т.д.
Connections — это список подключений к кластеру:
Channels — список каналов. В рамках одного подключения можно создавать несколько каналов, через которые отправляются сообщения. Это сделано для того, чтобы не создавать множество TCP-соединений.
Exchanges
Обменники рассмотрим подробнее.
Они имеют бывают разных видов, которые отличаются механизмом правил фильтрации сообщений — на основе этих правил события попадают в конкретные очереди.
- Direct Exchange
У каждого сообщения есть некоторый ключ — так называемый ключ маршрутизации (Routing Key). Эти сообщения попадут в те очереди, где в привязке к Exchange указан такой же ключ.
Пример настройки:
Указываем RoutingKey при создании обменника:
В созданном обменнике отправляем сообщение, указав такой же RoutingKey.
- Topic Exchange
Маршрутизация по некоторому ключу-шаблону. При создании шаблона могут использоваться 0 или более слов (латинские буквы в разных регистрах и цифры), с разделителем в виде точки (например, «key.event»), а также символы # и *.
- Fanout Exchange
Правила отсутствуют. Каждое отправленное сообщение попадает во все очереди.
- Headers Exchange
Использует заголовки байндинга и сообщения, сравнивает пары «ключ-значение» из этих заголовков.
Очереди
В отображении конкретной очереди можно увидеть график, где отображается, сколько сообщений находится в ней, статистика по времени доставки и принятия этих сообщений. Сообщения могут быть в двух статусах: Ready — ждут обработки, Unacked — в процессе обработки консьюмером.
Реализуем Publisher-Subscriber на Python
Разберём, как использовать RabbitMQ для реализации Pub-Sub на Python. У вас должен быть установлен Python. При написании этой статьи была использована версия 3.11.5.
Мы будем использовать библиотеку Pika. Есть и другие библиотеки, например, rabbitpy или amqp-lib, но они очень давно не обновлялись.
Выберите или создайте папку, в которой будет находиться код приложения, например:
Откройте эту папку в вашей IDE. Я использую Visual Studio Code, также можно использовать PyCharm или другие среды разработки.
Установим библиотеку Pika с помощью команды в терминале:
Добавим два файла: sender.py и receiver.py:
В файле sender.py опишем код для Producer, который будет отправлять сообщения. Для этого нужно:
-
Создать подключение к брокеру
-
Создать новый канал с помощью этого подключения
-
Создать новый обменник
-
Создать очередь
-
Привязать очередь к обменнику
-
Выполнить функцию публикации сообщения в созданную очередь
Код sender.py:
Теперь реализуем получение сообщения:
-
Добавим подключение и канал
-
Определим обменник и очередь с тем же названием, что и в файле
sender.py -
Привяжем очередь к обменнику
-
Опишем функцию, которая будет вызываться при получении нового сообщения
-
Вызовем метод
start_consumingдля того, чтобы запустить ожидание сообщений
Код receiver.py:
Сначала запустим receiver:
Далее запускаем sender:
И в терминале, где запущен receiver, мы видим сообщение:
В RabbitMQ Management мы можем зайти в созданный обменник и увидеть привязку к очереди по Routing key:
Подготовили для вас выгодные тарифы на облачные серверы
477 ₽/мес
657 ₽/мес
Заключение
В этой статье мы рассмотрели, что такое брокеры сообщений, как приложения работают с ними, а также научились устанавливать и использовать брокер сообщений RabbitMQ.
