Брокеры сообщений — это приложения-посредники, используемые в различных видах архитектуры программного обеспечения, например, в микросервисах. Брокеры пересылают информацию в виде сообщений от сервиса к сервису.
С брокерами сообщений тесно связан паттерн «publisher-subscriber», суть которого заключается в передаче некоторых событий от так называемых «издателей» (Producers) к потребителям (Consumers).
Чаще всего в реализации этого паттерна участвует промежуточное звено — тот самый брокер сообщений.
Для простоты можно провести аналогию. Представим YouTube: там есть каналы, у которых можно подписаться на уведомления. Далее происходит событие — публикация нового видео. Мы подписаны на это событие и получаем уведомление о том, что видео вышло.
Тут нас будут интересовать три главных термина: Exchange, Queue, Binding.
Сюда попадают сообщения-события. Exchange решает, в какую очередь какие события попадут. Это происходит на основе связей — байндингов, о которых будет сказано далее. На схеме отметим Exchange вот таким способом:
Или просто очередь. Структура данных, получение которых построено по принципу «первый вошёл — первый вышел», или FIFO. Данные хранятся на диске или в ОЗУ и представляют непосредственно сообщения. Очередь отдает копии этих данных потребителям (consumers). Схематично изобразим:
Переводится как «привязка». Это набор правил, которые указывают Exchange, в какую из очередей должны попадать сообщения. Между Exchange и Queue может быть несколько привязок с разными параметрами.
Producer — это какой-то сервис, который генерирует сообщения и посылает их брокеру. Consumer — это другой сервис, который при получении сообщения начинает его обработку. В случае той же аналогии с Ютубом, «издатель» — это сам YouTube, а «потребитель» — это ваш телефон, получающий пуш-уведомления
Producer отправляет сообщение, и оно попадает в Exchange.
Очередь хранит это сообщение — например, на диске.
Когда consumer готов принять сообщение (закончил обработку предыдущего или просто приложение-обработчик событий запустилось) — сервер посылает в него скопированные данные из очереди.
Consumer получает сообщение, затем как-то обрабатывает его и передаёт брокеру подтверждение (ACK).
После получения брокером подтверждения сообщение удаляется.
Это популярный open-source брокер сообщений. Он базируется на основе протокола AMQP (Advanced Message Queuing Protocol) — это открытый протокол для передачи сообщений-событий через специальный брокер. Сам протокол имеет очень много возможностей, и помимо RabbitMQ его реализует, например, Apache Qpid.
Сам «кролик» разработан на языке программирования Erlang, а его основные преимущества - пропускная способность и максимальная гибкость маршрутизации. Для «кролика» существует множество вариантов настройки правил того, какие сообщения и куда будут попадать на обработку.
Подключаемся к Рэббиту мы по протоколу TCP: клиент инициализирует подключение по адресу хоста и сохраняет соединение до тех пор, пока ему необходимо взаимодействовать с RabbitMQ. Он также поддерживает аутентификацию, например, по логину и паролю
RabbitMQ состоит из двух частей: сервер и WebUI (админка). В UI можно смотреть, что, собственно, происходит в нашем брокере: живы ли узлы в кластере, сколько сообщений в процессе обработки и т.д. Давайте разберёмся, как это всё устанавливать на разные ОС.
Для установки Rabbit на Ubuntu или Debian есть готовая инструкция на официальном сайте. Можно просто выполнить следующий скрипт в командной строке. Разберём его подробнее:
Установка стандартных зависимостей
sudo apt-get update -y sudo apt-get install curl gnupg -y sudo apt-get install apt-transport-https
Добавляем ключи репозиториев
curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null curl -1sLf https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg > /dev/null curl -1sLf https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/rabbitmq.9F4587F226208342.gpg > /dev/null deb [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu $distribution main deb-src [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu $distribution main deb [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu $distribution main deb-src [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu $distribution main deb [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu $distribution main deb-src [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu $distribution main deb [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu $distribution main deb-src [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu $distribution main
sudo rabbitmq-plugins enable rabbitmq_management
Добавим пользователя, под которым мы будем заходить:
sudo rabbitmqctl add_user rabbitmqadmin ваш_пароль
И выдадим ему все права:
sudo rabbitmqctl set_user_tags rabbitmqadmin administrator
sudo rabbitmqctl set_permissions -p / rabbitmqadmin ".*" ".*" ".*"
Теперь мы можем зайти на адрес <ваш-IP>:15672
, ввести логин и пароль и увидеть админку.
Разберём основные возможности RabbitMQ и его панели администрирования. На главной странице можно посмотреть общую информацию: узлы, их состояние, общее количество сообщений и т.д.
Connections — это список подключений к кластеру:
Channels — список каналов. В рамках одного подключения можно создавать несколько каналов, через которые отправляются сообщения. Это сделано для того, чтобы не создавать множество TCP-соединений.
Обменники рассмотрим подробнее.
Они имеют бывают разных видов, которые отличаются механизмом правил фильтрации сообщений — на основе этих правил события попадают в конкретные очереди.
У каждого сообщения есть некоторый ключ — так называемый ключ маршрутизации (Routing Key). Эти сообщения попадут в те очереди, где в привязке к Exchange указан такой же ключ.
Пример настройки:
Указываем RoutingKey при создании обменника:
В созданном обменнике отправляем сообщение, указав такой же RoutingKey.
Маршрутизация по некоторому ключу-шаблону. При создании шаблона могут использоваться 0 или более слов (латинские буквы в разных регистрах и цифры), с разделителем в виде точки (например, «key.event»), а также символы #
и *
.
Правила отсутствуют. Каждое отправленное сообщение попадает во все очереди.
Использует заголовки байндинга и сообщения, сравнивает пары «ключ-значение» из этих заголовков.
В отображении конкретной очереди можно увидеть график, где отображается, сколько сообщений находится в ней, статистика по времени доставки и принятия этих сообщений. Сообщения могут быть в двух статусах: Ready — ждут обработки, Unacked — в процессе обработки консьюмером.
Разберём, как использовать RabbitMQ для реализации Pub-Sub на Python. У вас должен быть установлен Python. При написании этой статьи была использована версия 3.11.5.
Мы будем использовать библиотеку Pika. Есть и другие библиотеки, например, rabbitpy или amqp-lib, но они очень давно не обновлялись.
Выберите или создайте папку, в которой будет находиться код приложения, например:
mkdir rabbitmq-article
Откройте эту папку в вашей IDE. Я использую Visual Studio Code, также можно использовать PyCharm или другие среды разработки.
Установим библиотеку Pika с помощью команды в терминале:
pip install pika
Добавим два файла: sender.py
и receiver.py
:
В файле sender.py
опишем код для Producer, который будет отправлять сообщения. Для этого нужно:
Создать подключение к брокеру
Создать новый канал с помощью этого подключения
Создать новый обменник
Создать очередь
Привязать очередь к обменнику
Выполнить функцию публикации сообщения в созданную очередь
Код sender.py
:
from pika import BlockingConnection, ConnectionParameters
from pika.exchange_type import ExchangeType
# создаём подключение, указав параметры в объекте ConnectionParameters
connection = BlockingConnection(ConnectionParameters(host='localhost'))
# создаём канал
channel = connection.channel()
# создаём обменник
channel.exchange_declare('new_exchange', ExchangeType.direct)
# определяем очередь
queue = channel.queue_declare(queue='new_queue')
# привязываем очередь к обменнику
channel.queue_bind(exchange='new_exchange', queue='new_queue', routing_key='key')
# публикуем сообщение
channel.basic_publish(exchange='new_exchange', routing_key='key', body='Hello World!')
print("Сообщение 'Hello World!' отправлено")
connection.close()
Теперь реализуем получение сообщения:
Добавим подключение и канал
Определим обменник и очередь с тем же названием, что и в файле sender.py
Привяжем очередь к обменнику
Опишем функцию, которая будет вызываться при получении нового сообщения
Вызовем метод start_consuming
для того, чтобы запустить ожидание сообщений
Код receiver.py
:
from pika import BlockingConnection, ConnectionParameters
from pika.exchange_type import ExchangeType
import sys, os
def main():
# создаём подключение
connection = BlockingConnection(ConnectionParameters(host='localhost'))
# создаём канал
channel = connection.channel()
# создаём обменник
channel.exchange_declare('new_exchange', ExchangeType.direct)
# определяем очередь
queue = channel.queue_declare(queue='new_queue')
# привязываем очередь к обменнику
channel.queue_bind(exchange='new_exchange', queue='new_queue', routing_key='key')
# функция, которая вызывается при получении сообщения
def handle(ch, method, properties, body):
print(f"Получено сообщение: {body}")
# привязываем callback-функцию и очередь
channel.basic_consume(queue='new_queue', on_message_callback=callback, auto_ack=True)
print('Ожидание сообщения. Чтобы завершить работу приложения, нажмите ctrl+c')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
try:
sys.exit(0)
except SystemExit:
os._exit(0)
Сначала запустим receiver
:
Далее запускаем sender
:
И в терминале, где запущен receiver
, мы видим сообщение:
В RabbitMQ Management мы можем зайти в созданный обменник и увидеть привязку к очереди по Routing key:
В этой статье мы рассмотрели, что такое брокеры сообщений, как приложения работают с ними, а также научились устанавливать и использовать брокер сообщений RabbitMQ.