Давайте дружить в Телеграме: рассказываем про новые фичи, общаемся в комментах, прислушиваемся к вашим идеям Подписаться

Установка, настройка и использование RabbitMQ

Дмитрий Бахтенков
Дмитрий Бахтенков
Технический писатель
10 апреля 2024 г.
247
12 минут чтения
Средний рейтинг статьи: 5

Брокеры сообщений — это приложения-посредники, используемые в различных видах архитектуры программного обеспечения, например, в микросервисах. Брокеры пересылают информацию в виде сообщений от сервиса к сервису. 

С брокерами сообщений тесно связан паттерн «publisher-subscriber», суть которого заключается в передаче некоторых событий от так называемых «издателей» (Producers) к потребителям (Consumers). 

C8a108f0 E9fd 4461 8d57 4999d1c8b1e5

Чаще всего в реализации этого паттерна участвует промежуточное звено — тот самый брокер сообщений. 

Image31

Для простоты можно провести аналогию. Представим YouTube: там есть каналы, у которых можно подписаться на уведомления. Далее происходит событие — публикация нового видео. Мы подписаны на это событие и получаем уведомление о том, что видео вышло.

7e626dbb C1e3 4ded B9d4 9b62fa5530d5

Как работают приложения с брокерами сообщений?

Тут нас будут интересовать три главных термина: Exchange, Queue, Binding.

Exchange 

Сюда попадают сообщения-события. Exchange решает, в какую очередь какие события попадут. Это происходит на основе связей — байндингов, о которых будет сказано далее. На схеме отметим Exchange вот таким способом:

Image32

Queue 

Или просто очередь. Структура данных, получение которых построено по принципу «первый вошёл — первый вышел», или FIFO. Данные хранятся на диске или в ОЗУ и представляют непосредственно сообщения. Очередь отдает копии этих данных потребителям (consumers). Схематично изобразим:

Image26

Binding 

Переводится как «привязка». Это набор правил, которые указывают Exchange, в какую из очередей должны попадать сообщения. Между Exchange и Queue может быть несколько привязок с разными параметрами.

Image17

Принцип работы 

Producer — это какой-то сервис, который генерирует сообщения и посылает их брокеру. Consumer — это другой сервис, который при получении сообщения начинает его обработку. В случае той же аналогии с Ютубом, «издатель» — это сам YouTube, а «потребитель» — это ваш телефон, получающий пуш-уведомления

  1. Producer отправляет сообщение, и оно попадает в Exchange.

Image8

  1. Очередь хранит это сообщение — например, на диске.

Image10

  1. Когда consumer готов принять сообщение (закончил обработку предыдущего или просто приложение-обработчик событий запустилось) — сервер посылает в него скопированные данные из очереди.

 Image29

  1. Consumer получает сообщение, затем как-то обрабатывает его и передаёт брокеру подтверждение (ACK).

Image27

  1. После получения брокером подтверждения сообщение удаляется.

Image40

RabbitMQ

Это популярный open-source брокер сообщений. Он базируется на основе протокола AMQP (Advanced Message Queuing Protocol) — это открытый протокол для передачи сообщений-событий через специальный брокер. Сам протокол имеет очень много возможностей, и помимо RabbitMQ его реализует, например, Apache Qpid

Сам «кролик» разработан на языке программирования Erlang, а его основные преимущества - пропускная способность и максимальная гибкость маршрутизации. Для «кролика» существует множество вариантов настройки правил того, какие сообщения и куда будут попадать на обработку. 

Подключаемся к Рэббиту мы по протоколу TCP: клиент инициализирует подключение по адресу хоста и сохраняет соединение до тех пор, пока ему необходимо взаимодействовать с RabbitMQ. Он также поддерживает аутентификацию, например, по логину и паролю

Установка на разные ОС

RabbitMQ состоит из двух частей: сервер и WebUI (админка). В UI можно смотреть, что, собственно, происходит в нашем брокере: живы ли узлы в кластере, сколько сообщений в процессе обработки и т.д. Давайте разберёмся, как это всё устанавливать на разные ОС.

Ubuntu / Debian

Для установки Rabbit на Ubuntu или Debian есть готовая инструкция на официальном сайте. Можно просто выполнить следующий скрипт в командной строке. Разберём его подробнее:

  • Установка стандартных зависимостей

sudo apt-get update -y
sudo apt-get install curl gnupg -y
sudo apt-get install apt-transport-https
  • Добавляем ключи репозиториев

curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null

curl -1sLf https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg > /dev/null

curl -1sLf https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key | sudo gpg --dearmor | sudo tee /usr/share/keyrings/rabbitmq.9F4587F226208342.gpg > /dev/null

deb [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu $distribution main

deb-src [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu $distribution main

deb [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu $distribution main

deb-src [signed-by=/usr/share/keyrings/rabbitmq.E495BB49CC4BBE5B.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-erlang/deb/ubuntu $distribution main

deb [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu $distribution main

deb-src [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa1.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu $distribution main

deb [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu $distribution main

deb-src [signed-by=/usr/share/keyrings/rabbitmq.9F4587F226208342.gpg] https://ppa2.novemberain.com/rabbitmq/rabbitmq-server/deb/ubuntu $distribution main
  • После выполнения скрипта нам необходимо включить UI. Выполним следующие команды:
sudo rabbitmq-plugins enable rabbitmq_management

Image18

  • Добавим пользователя, под которым мы будем заходить:

sudo rabbitmqctl add_user rabbitmqadmin ваш_пароль
  • И выдадим ему все права:

sudo rabbitmqctl set_user_tags rabbitmqadmin administrator
sudo rabbitmqctl set_permissions -p / rabbitmqadmin ".*" ".*" ".*"

Теперь мы можем зайти на адрес <ваш-IP>:15672, ввести логин и пароль и увидеть админку.

Image21

Windows

Для установки RabbitMQ на Windows также есть официальная инструкция, в которой описан процесс установки. Можно использовать Chocolatey или простой установщик. Установка этого брокера сообщений на Windows может быть полезна, например, для локальной отладки приложений. Но сначала нам необходимо установить Erlang/OTP с официального сайта Erlang

Image25

Запустите скачанный файл и пройдите процесс установки.

Image14

После установки Erlang ставим сам Rabbit, например, через файл установки с официального сайта:

Image35

Далее включим UI:

cd 'C:\Program Files\RabbitMQ Server\rabbitmq_server-3.13.0\sbin'
./rabbitmq-plugins.bat enable rabbitmq_management

Image20

По адресу localhost:15672 и с данными для входа guest:guest мы можем попасть в Web UI.

301b15e2 4536 467d Bb17 Cf411f5b0299

Astra Linux

Astra Linux — сертифицированный российский дистрибутив Linux, основанный на Debian. Разберём, как установить RabbitMQ на эту ОС. Мы будем использовать Astra Linux CE 2.12, доступный для установки в Timeweb Cloud.

Image41

В менеджер пакетов APT уже добавлены нужные нам ключи и источники ПО, поэтому RabbitMQ можно просто поставить командой:

sudo apt install -y rabbitmq-server

Image12

Включаем плагин для Web UI:

sudo rabbitmq-plugins enable rabbitmq_management

Добавляем пользователя admin по аналогии с Ubuntu:

sudo rabbitmqctl add_user rabbitmqadmin ваш_пароль
sudo rabbitmqctl set_user_tags rabbitmqadmin administrator
sudo rabbitmqctl set_permissions -p / rabbitmqadmin ".*" ".*" ".*"

По адресу ваш_ip:15672 будет доступна панель администратора.

Image5

Docker

Вероятно, самый простой и удобный вариант установки RabbitMQ. Мы можем поставить RabbitMQ, используя команду docker run:

docker run --rm -p 15672:15672 rabbitmq:3.13.0-management

Image3

На порту 15672 будет доступна админка, в которой можно авторизоваться с помощью данных для входа guest:guest.

Однако такой вариант запуска не подходит для production-среды — докер назначает имя сервера для контейнера, а брокер сообщений хранит своё состояние в папке с этим названием. При каждой новой сборке контейнера Рэббит будет терять информацию о стейте.

С использованием docker-compose мы сможем удобно описать все необходимые характеристики сервиса: сменить стандартные логин и пароль, примонтировать папку с состоянием и т.д. Добавим файл docker-compose.yml (это можно сделать на вашем сервере, где установлен Docker, или на вашем ПК). Вот какое может быть содержимое у этого файла:

version: "3.3"
services:
  rabbit:
    image: rabbitmq:3.13.0-management
    environment:
      - RABBITMQ_DEFAULT_USER=admin #укажите имя пользователя
      - RABBITMQ_DEFAULT_PASS=password #укажите сложный пароль
    volumes:
      - ./rabbit:/var/lib/rabbitmq #монтируем папку с состоянием
    ports:
      - 15672:15672 #открываем порт для админки

Запускаем с помощью команды:

docker compose up -d

Image6

И нам снова доступна админка.

Обзор RabbitMQ Management

Разберём основные возможности RabbitMQ и его панели администрирования. На главной странице можно посмотреть общую информацию: узлы, их состояние, общее количество сообщений и т.д.

Image5

Connections — это список подключений к кластеру:

Image23

Channels — список каналов. В рамках одного подключения можно создавать несколько каналов, через которые отправляются сообщения. Это сделано для того, чтобы не создавать множество TCP-соединений.

Image9

Exchanges

Обменники рассмотрим подробнее.

Image1

Они имеют бывают разных видов, которые отличаются механизмом правил фильтрации сообщений — на основе этих правил события попадают в конкретные очереди.

  • Direct Exchange

У каждого сообщения есть некоторый ключ — так называемый ключ маршрутизации (Routing Key). Эти сообщения попадут в те очереди, где в привязке к Exchange указан такой же ключ.

Image24

Пример настройки:

Указываем RoutingKey при создании обменника:

Image39

В созданном обменнике отправляем сообщение, указав такой же RoutingKey.

Image36

  • Topic Exchange

Маршрутизация по некоторому ключу-шаблону. При создании шаблона могут использоваться 0 или более слов (латинские буквы в разных регистрах и цифры), с разделителем в виде точки (например, «key.event»), а также символы # и *.

Image11

  • Fanout Exchange

Правила отсутствуют. Каждое отправленное сообщение попадает во все очереди.

Image34

  • Headers Exchange

Использует заголовки байндинга и сообщения, сравнивает пары «ключ-значение» из этих заголовков.

Image37

Очереди

Image13

В отображении конкретной очереди можно увидеть график, где отображается, сколько сообщений находится в ней, статистика по времени доставки и принятия этих сообщений. Сообщения могут быть в двух статусах: Ready — ждут обработки, Unacked — в процессе обработки консьюмером. 

Image33

Реализуем Publisher-Subscriber на Python

Разберём, как использовать RabbitMQ для реализации Pub-Sub на Python. У вас должен быть установлен Python. При написании этой статьи была использована версия 3.11.5.

Мы будем использовать библиотеку Pika. Есть и другие библиотеки, например, rabbitpy или amqp-lib, но они очень давно не обновлялись.

Выберите или создайте папку, в которой будет находиться код приложения, например:

mkdir rabbitmq-article

Откройте эту папку в вашей IDE. Я использую Visual Studio Code, также можно использовать PyCharm или другие среды разработки.

Установим библиотеку Pika с помощью команды в терминале:

pip install pika 

Image28

Добавим два файла: sender.py и receiver.py:

Image30

В файле sender.py опишем код для Producer, который будет отправлять сообщения. Для этого нужно:

  • Создать подключение к брокеру

  • Создать новый канал с помощью этого подключения

  • Создать новый обменник

  • Создать очередь

  • Привязать очередь к обменнику

  • Выполнить функцию публикации сообщения в созданную очередь

Код sender.py:

from pika import BlockingConnection, ConnectionParameters
from pika.exchange_type import ExchangeType

# создаём подключение, указав параметры в объекте ConnectionParameters 
connection = BlockingConnection(ConnectionParameters(host='localhost'))

# создаём канал
channel = connection.channel()

# создаём обменник
channel.exchange_declare('new_exchange', ExchangeType.direct)

# определяем очередь
queue = channel.queue_declare(queue='new_queue')

# привязываем очередь к обменнику
channel.queue_bind(exchange='new_exchange', queue='new_queue', routing_key='key')

# публикуем сообщение
channel.basic_publish(exchange='new_exchange', routing_key='key', body='Hello World!')

print("Сообщение 'Hello World!' отправлено")
connection.close()

Теперь реализуем получение сообщения:

  • Добавим подключение и канал

  • Определим обменник и очередь с тем же названием, что и в файле sender.py

  • Привяжем очередь к обменнику

  • Опишем функцию, которая будет вызываться при получении нового сообщения

  • Вызовем метод start_consuming для того, чтобы запустить ожидание сообщений

Код receiver.py:

from pika import BlockingConnection, ConnectionParameters
from pika.exchange_type import ExchangeType
import sys, os

def main():
    # создаём подключение
    connection = BlockingConnection(ConnectionParameters(host='localhost'))
    # создаём канал
    channel = connection.channel()

    # создаём обменник
    channel.exchange_declare('new_exchange', ExchangeType.direct)

    # определяем очередь
    queue = channel.queue_declare(queue='new_queue')

    # привязываем очередь к обменнику
    channel.queue_bind(exchange='new_exchange', queue='new_queue', routing_key='key')

    # функция, которая вызывается при получении сообщения
    def handle(ch, method, properties, body):
        print(f"Получено сообщение: {body}")

    # привязываем callback-функцию и очередь
    channel.basic_consume(queue='new_queue', on_message_callback=callback, auto_ack=True)

    print('Ожидание сообщения. Чтобы завершить работу приложения, нажмите ctrl+c')
    channel.start_consuming()

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

Сначала запустим receiver:

Image16

Далее запускаем sender:

Bcab1a57 6e16 4e38 9068 E469caa29ff1

И в терминале, где запущен receiver, мы видим сообщение:

Image15

В RabbitMQ Management мы можем зайти в созданный обменник и увидеть привязку к очереди по Routing key:

Image2

Заключение

В этой статье мы рассмотрели, что такое брокеры сообщений, как приложения работают с ними, а также научились устанавливать и использовать брокер сообщений RabbitMQ.

Зарегистрируйтесь и начните пользоваться
сервисами Timeweb Cloud прямо сейчас

15 лет опыта
Сосредоточьтесь на своей работе: об остальном позаботимся мы
165 000 клиентов
Нам доверяют частные лица и компании, от небольших фирм до корпораций
Поддержка 24/7
100+ специалистов поддержки, готовых помочь в чате, тикете и по телефону