Брокеры сообщений — это приложения-посредники, используемые в различных видах архитектуры программного обеспечения, например, в микросервисах. Брокеры пересылают информацию в виде сообщений от сервиса к сервису.
С брокерами сообщений тесно связан паттерн «publisher-subscriber», суть которого заключается в передаче некоторых событий от так называемых «издателей» (Producers) к потребителям (Consumers).
Чаще всего в реализации этого паттерна участвует промежуточное звено — тот самый брокер сообщений.
Для простоты можно провести аналогию. Представим YouTube: там есть каналы, у которых можно подписаться на уведомления. Далее происходит событие — публикация нового видео. Мы подписаны на это событие и получаем уведомление о том, что видео вышло.
Тут нас будут интересовать три главных термина: Exchange, Queue, Binding.
Сюда попадают сообщения-события. Exchange решает, в какую очередь какие события попадут. Это происходит на основе связей — байндингов, о которых будет сказано далее. На схеме отметим Exchange вот таким способом:
Или просто очередь. Структура данных, получение которых построено по принципу «первый вошёл — первый вышел», или FIFO. Данные хранятся на диске или в ОЗУ и представляют непосредственно сообщения. Очередь отдает копии этих данных потребителям (consumers). Схематично изобразим:
Переводится как «привязка». Это набор правил, которые указывают Exchange, в какую из очередей должны попадать сообщения. Между Exchange и Queue может быть несколько привязок с разными параметрами.
Producer — это какой-то сервис, который генерирует сообщения и посылает их брокеру. Consumer — это другой сервис, который при получении сообщения начинает его обработку. В случае той же аналогии с Ютубом, «издатель» — это сам YouTube, а «потребитель» — это ваш телефон, получающий пуш-уведомления
Producer отправляет сообщение, и оно попадает в Exchange.
Очередь хранит это сообщение — например, на диске.
Когда consumer готов принять сообщение (закончил обработку предыдущего или просто приложение-обработчик событий запустилось) — сервер посылает в него скопированные данные из очереди.
Consumer получает сообщение, затем как-то обрабатывает его и передаёт брокеру подтверждение (ACK).
После получения брокером подтверждения сообщение удаляется.
Это популярный open-source брокер сообщений. Он базируется на основе протокола AMQP (Advanced Message Queuing Protocol) — это открытый протокол для передачи сообщений-событий через специальный брокер. Сам протокол имеет очень много возможностей, и помимо RabbitMQ его реализует, например, Apache Qpid.
Сам «кролик» разработан на языке программирования Erlang, а его основные преимущества - пропускная способность и максимальная гибкость маршрутизации. Для «кролика» существует множество вариантов настройки правил того, какие сообщения и куда будут попадать на обработку.
Подключаемся к Рэббиту мы по протоколу TCP: клиент инициализирует подключение по адресу хоста и сохраняет соединение до тех пор, пока ему необходимо взаимодействовать с RabbitMQ. Он также поддерживает аутентификацию, например, по логину и паролю
RabbitMQ состоит из двух частей: сервер и WebUI (админка). В UI можно смотреть, что, собственно, происходит в нашем брокере: живы ли узлы в кластере, сколько сообщений в процессе обработки и т.д. Давайте разберёмся, как это всё устанавливать на разные ОС.
Ubuntu / Debian
Для установки Rabbit на Ubuntu или Debian есть готовая инструкция на официальном сайте. Можно просто выполнить следующий скрипт в командной строке. Разберём его подробнее:
Установка стандартных зависимостей
sudo apt-get update -y
sudo apt-get install curl gnupg -y
sudo apt-get install apt-transport-https
Добавляем ключи репозиториев
curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null
curl -1sLf https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg > /dev/null
curl -1sLf https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/rabbitmq.9F4587F226208342.gpg > /dev/null
deb [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu $distribution main
deb-src [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu $distribution main
deb [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu $distribution main
deb-src [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu $distribution main
deb [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu $distribution main
deb-src [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu $distribution main
deb [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu $distribution main
deb-src [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu $distribution main
sudo rabbitmq-plugins enable rabbitmq_management
Добавим пользователя, под которым мы будем заходить:
sudo rabbitmqctl add_user rabbitmqadmin ваш_пароль
И выдадим ему все права:
sudo rabbitmqctl set_user_tags rabbitmqadmin administrator
sudo rabbitmqctl set_permissions -p / rabbitmqadmin ".*" ".*" ".*"
Теперь мы можем зайти на адрес <ваш-IP>:15672
, ввести логин и пароль и увидеть админку.
Windows
Для установки RabbitMQ на Windows также есть официальная инструкция, в которой описан процесс установки. Можно использовать Chocolatey или простой установщик. Установка этого брокера сообщений на Windows может быть полезна, например, для локальной отладки приложений. Но сначала нам необходимо установить Erlang/OTP с официального сайта Erlang.
Запустите скачанный файл и пройдите процесс установки.
После установки Erlang ставим сам Rabbit, например, через файл установки с официального сайта:
Далее включим UI:
cd 'C:\Program Files\RabbitMQ Server\rabbitmq_server-3.13.0\sbin'
./rabbitmq-plugins.bat enable rabbitmq_management
По адресу localhost:15672
и с данными для входа guest
:guest
мы можем попасть в Web UI.
Astra Linux
Astra Linux — сертифицированный российский дистрибутив Linux, основанный на Debian. Разберём, как установить RabbitMQ на эту ОС. Мы будем использовать Astra Linux CE 2.12, доступный для установки в Timeweb Cloud.
В менеджер пакетов APT уже добавлены нужные нам ключи и источники ПО, поэтому RabbitMQ можно просто поставить командой:
sudo apt install -y rabbitmq-server
Включаем плагин для Web UI:
sudo rabbitmq-plugins enable rabbitmq_management
Добавляем пользователя admin
по аналогии с Ubuntu:
sudo rabbitmqctl add_user rabbitmqadmin ваш_пароль
sudo rabbitmqctl set_user_tags rabbitmqadmin administrator
sudo rabbitmqctl set_permissions -p / rabbitmqadmin ".*" ".*" ".*"
По адресу ваш_ip:15672
будет доступна панель администратора.
Docker
Вероятно, самый простой и удобный вариант установки RabbitMQ. Мы можем поставить RabbitMQ, используя команду docker run
:
docker run --rm -p 15672:15672 rabbitmq:3.13.0-management
На порту 15672 будет доступна админка, в которой можно авторизоваться с помощью данных для входа guest
:guest
.
Однако такой вариант запуска не подходит для production-среды — докер назначает имя сервера для контейнера, а брокер сообщений хранит своё состояние в папке с этим названием. При каждой новой сборке контейнера Рэббит будет терять информацию о стейте.
С использованием docker-compose
мы сможем удобно описать все необходимые характеристики сервиса: сменить стандартные логин и пароль, примонтировать папку с состоянием и т.д. Добавим файл docker-compose.yml
(это можно сделать на вашем сервере, где установлен Docker, или на вашем ПК). Вот какое может быть содержимое у этого файла:
version: "3.3"
services:
rabbit:
image: rabbitmq:3.13.0-management
environment:
- RABBITMQ_DEFAULT_USER=admin #укажите имя пользователя
- RABBITMQ_DEFAULT_PASS=password #укажите сложный пароль
volumes:
- ./rabbit:/var/lib/rabbitmq #монтируем папку с состоянием
ports:
- 15672:15672 #открываем порт для админки
Запускаем с помощью команды:
docker compose up -d
И нам снова доступна админка.
Разберём основные возможности RabbitMQ и его панели администрирования. На главной странице можно посмотреть общую информацию: узлы, их состояние, общее количество сообщений и т.д.
Connections — это список подключений к кластеру:
Channels — список каналов. В рамках одного подключения можно создавать несколько каналов, через которые отправляются сообщения. Это сделано для того, чтобы не создавать множество TCP-соединений.
Обменники рассмотрим подробнее.
Они имеют бывают разных видов, которые отличаются механизмом правил фильтрации сообщений — на основе этих правил события попадают в конкретные очереди.
У каждого сообщения есть некоторый ключ — так называемый ключ маршрутизации (Routing Key). Эти сообщения попадут в те очереди, где в привязке к Exchange указан такой же ключ.
Пример настройки:
Указываем RoutingKey при создании обменника:
В созданном обменнике отправляем сообщение, указав такой же RoutingKey.
Маршрутизация по некоторому ключу-шаблону. При создании шаблона могут использоваться 0 или более слов (латинские буквы в разных регистрах и цифры), с разделителем в виде точки (например, «key.event»), а также символы #
и *
.
Правила отсутствуют. Каждое отправленное сообщение попадает во все очереди.
Использует заголовки байндинга и сообщения, сравнивает пары «ключ-значение» из этих заголовков.
В отображении конкретной очереди можно увидеть график, где отображается, сколько сообщений находится в ней, статистика по времени доставки и принятия этих сообщений. Сообщения могут быть в двух статусах: Ready — ждут обработки, Unacked — в процессе обработки консьюмером.
Разберём, как использовать RabbitMQ для реализации Pub-Sub на Python. У вас должен быть установлен Python. При написании этой статьи была использована версия 3.11.5.
Мы будем использовать библиотеку Pika. Есть и другие библиотеки, например, rabbitpy или amqp-lib, но они очень давно не обновлялись.
Выберите или создайте папку, в которой будет находиться код приложения, например:
mkdir rabbitmq-article
Откройте эту папку в вашей IDE. Я использую Visual Studio Code, также можно использовать PyCharm или другие среды разработки.
Установим библиотеку Pika с помощью команды в терминале:
pip install pika
Добавим два файла: sender.py
и receiver.py
:
В файле sender.py
опишем код для Producer, который будет отправлять сообщения. Для этого нужно:
Создать подключение к брокеру
Создать новый канал с помощью этого подключения
Создать новый обменник
Создать очередь
Привязать очередь к обменнику
Выполнить функцию публикации сообщения в созданную очередь
Код sender.py
:
from pika import BlockingConnection, ConnectionParameters
from pika.exchange_type import ExchangeType
# создаём подключение, указав параметры в объекте ConnectionParameters
connection = BlockingConnection(ConnectionParameters(host='localhost'))
# создаём канал
channel = connection.channel()
# создаём обменник
channel.exchange_declare('new_exchange', ExchangeType.direct)
# определяем очередь
queue = channel.queue_declare(queue='new_queue')
# привязываем очередь к обменнику
channel.queue_bind(exchange='new_exchange', queue='new_queue', routing_key='key')
# публикуем сообщение
channel.basic_publish(exchange='new_exchange', routing_key='key', body='Hello World!')
print("Сообщение 'Hello World!' отправлено")
connection.close()
Теперь реализуем получение сообщения:
Добавим подключение и канал
Определим обменник и очередь с тем же названием, что и в файле sender.py
Привяжем очередь к обменнику
Опишем функцию, которая будет вызываться при получении нового сообщения
Вызовем метод start_consuming
для того, чтобы запустить ожидание сообщений
Код receiver.py
:
from pika import BlockingConnection, ConnectionParameters
from pika.exchange_type import ExchangeType
import sys, os
def main():
# создаём подключение
connection = BlockingConnection(ConnectionParameters(host='localhost'))
# создаём канал
channel = connection.channel()
# создаём обменник
channel.exchange_declare('new_exchange', ExchangeType.direct)
# определяем очередь
queue = channel.queue_declare(queue='new_queue')
# привязываем очередь к обменнику
channel.queue_bind(exchange='new_exchange', queue='new_queue', routing_key='key')
# функция, которая вызывается при получении сообщения
def handle(ch, method, properties, body):
print(f"Получено сообщение: {body}")
# привязываем callback-функцию и очередь
channel.basic_consume(queue='new_queue', on_message_callback=callback, auto_ack=True)
print('Ожидание сообщения. Чтобы завершить работу приложения, нажмите ctrl+c')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
try:
sys.exit(0)
except SystemExit:
os._exit(0)
Сначала запустим receiver
:
Далее запускаем sender
:
И в терминале, где запущен receiver
, мы видим сообщение:
В RabbitMQ Management мы можем зайти в созданный обменник и увидеть привязку к очереди по Routing key:
В этой статье мы рассмотрели, что такое брокеры сообщений, как приложения работают с ними, а также научились устанавливать и использовать брокер сообщений RabbitMQ.