Давайте дружить в Телеграме: рассказываем про новые фичи, общаемся в комментах, прислушиваемся к вашим идеям Подписаться

Как реализовать очередь в Redis

Миша Курушин
Миша Курушин
Технический писатель
19 апреля 2024 г.
26
12 минут чтения
Средний рейтинг статьи: 5

Redis — база данных, которая хранит и обрабатывает информацию преимущественно в оперативной памяти.

За счет этой особенности Redis принято использовать для работы с «быстрыми» данными — теми, которые имеют определенный (часто небольшой) срок жизни.

Один из таких типов данных — кэш. Таким образом, Redis выступает в качестве кэширующей базы данных.

Однако, другой вариант использования Redis — обработка очереди сообщений в качестве брокера и хранилища сообщений. В этом случае Redis повторяет функционал таких инструментов, как Apache Kafka или RabbitMQ.

Алгоритм работы брокера сообщений выглядит примерно так:

  • Сервис 1 (например, микросервис на backend-стороне инфраструктуры приложения) передает сообщение брокеру.

  • Сервис 2 (возможно, также Сервис 3, 4, 5 и т.д.) извлекает сообщение из брокера и выполняет их обработку.

Все переданные брокеру сообщения буферизируются, формируя очередь сообщений.

За счет этого можно в один момент передать брокеру сразу несколько сообщений от одного или нескольких сервисов, после чего другие сервисы смогут извлечь интересующие их сообщения и обработать их за определенный промежуток времени.

Способы и инструменты реализации очереди

У Redis есть несколько встроенных инструментов (функций), которые позволяют реализовать механику очереди сообщений и их последовательную обработку. У каждого способа есть свои плюсы и минусы:

  • «Pub/Sub» (Публикация/Подписка). Один сервис (или несколько сервисов) публикует в отдельную очередь свое сообщение, после чего его могут обработать только те получатели, которые подписаны на эту очередь. В том случае, если на очередь никто не подписан, сообщение будет утеряно.

  • «List» (Список). Очередь типа FIFO (первым пришел — первым ушел). После отправки сервисом сообщения его получит только один подписчик.

  • «Stream» (Поток). Тоже самое, что и Pub/Sub, но с гарантией доставки сообщения. То есть при отсутствии чтения сообщение остается в очереди и ждет обработки.

Создание базы данных Redis в Timeweb Cloud

В этом руководстве будет использоваться облачная база данных Timeweb Cloud.

Соответственно, конфигурация и управления Redis будет выполняться через графический интерфейс панели управления Timeweb Cloud. Поэтому сперва нужно авторизоваться в ней.

Image1

Страница авторизации в Timeweb Cloud

После того, как откроется главная страница панели управления, необходимо найти в левом меню пункт «Базы данных» и кликнуть по нему.

Если ранее вы еще не создавали какие-либо базы данных на своем аккаунте, то на открывшейся странице будет кнопка «Создать», на которую нужно кликнуть.

Image2

Страница, на которой выводится список созданных облачных баз данных, либо, в случае их отсутствия, кнопка «Создать»

После этого откроется страница конфигурации базы данных.

Самый главный параметр — тип базы данных. В нашем случае выбирается «Redis» версии 6. Остальные настройки можно выставить по своему усмотрению.

На кнопке «Заказать» будет выводится стоимость месячной аренды базы данных. Для тестового проекта можно выбрать самую минимальную конфигурацию, чтобы снизить стоимость.

После завершения конфигурации жмем на кнопку «Заказать».

Image3

Страница конфигурации облачной базы данных

Откроется страница управления созданной базой данных. Некоторое время сервер будет неактивен — он будет запускаться и конфигурироваться.

Реализация очереди

Рассмотрим процесс реализации очереди в Redis пошагово.

Обновление системы

В этом руководстве использовался облачный сервер Timeweb Cloud под управлением операционной системы Ubuntu 22.04.

Перед началом конфигурации сервера под приложение Python сперва рекомендуется обновиться систему:

sudo apt update
sudo apt upgrade

Установка Python

Сперва следует убедиться, установлен ли в системе Python. Это можно сделать, запросив его версию:

python --version

В консоли появится примерно такой вывод:

Python 3.10.12

Обратите внимание, что в этом руководстве используется Python версии 3.10.12.

В случае, если в системе отсутствует Python, его следует установить через пакетный менеджер APT:

sudo apt install -y python3

Флаг -y необходим для автоматического ответа «yes» на все возникающие во время установки вопросы.

Установка виртуальной среды Python

Для работы приложения потребуется виртуальная среда Python. Поэтому устанавливаем ее:

sudo apt install python3-venv -y

Создание рабочего каталога

Создадим отдельную директория для проекта на Python:

mkdir project

И перейдем в нее:

cd project

Активация виртуальной среды

Перед началом написания кода нужно создать виртуальную среду Python в рабочем каталоге:

python -m venv venv

Теперь можно проверить состояние каталога:

ls

Если все прошло успешно, внутри появится папка виртуальной среды:

venv

Далее выполняется активация:

source ./venv/bin/activate

Установка пакетного менеджера Pip

Для работы с Redis потребуется специальный Python-модуль, через который будет выполняться подключение к серверу Redis и непосредственно управление базой данных.

Чтобы получить актуальную версию модуль, сперва необходимо установить пакетный менеджер Pip:

sudo apt install python3-pip -y

Для проверки корректности установки можно запросить версию Pip:

pip -V

В консоли должно появится примерно такое сообщение:

pip 22.0.2 from /usr/lib/python3/dist-packages/pip (python 3.10)

В этом руководстве использовался Pip версии 22.0.2.

Установка модуля Redis

Теперь можно установить Python-модуль для работы с Redis:

pip install redis

Впоследствии он будет импортироваться в коде приложения Python.

Написание приложения Python

Рассмотрим базовые способы создания очереди на основе Pub/Sub, List и Stream.

Очередь на основе Pub/Sub

В рабочем каталоге создадим файл обработчика, в котором будет читаться очередь сообщений:

sudo nano consumerPS.py

Код внутри файла такой:

import redis
import time

connection = redis.Redis(
	host="IP", # указываем IP-адрес сервера Redis
	password="PASSWORD", # указываем root-пароль сервера Redis
	port=6379, # стандартный порт для подключения к Redis без SSL
	db=0,
	decode_responses=True # ответы сервера Redis будут автоматически декодироваться в читаемый вид
)

queue = connection.pubsub() # создаем очередь типа Pub/Sub
queue.subscribe("channelFirst", "channelSecond") # подписываемся на указанные каналы

# бесконечный цикл обработки очереди сообщений
while True:
	time.sleep(0.01)
	msg = queue.get_message() # извлекаем сообщение
	if msg: # проверяем сообщение на пустоту
		if not isinstance(msg["data"], int): # проверяем, какой тип информации хранится в переменной data (msg имеет тип словаря)
			print(msg["data"]) # выводим сообщение в консоль

Сперва выполняется подключение к удаленному серверу Redis, после чего создается очередь типа Pub/Sub.

Обратите внимание, что при подключении к Redis нужно указать адрес удаленного хоста и root-пароль сервера. В этом примере используется подключение без использования SSL, поэтому указывается порт 6379.

Очередь подписывается на два канала — channelFirst и channelSecond.

Внутри бесконечного цикла с некоторым интервалом выполняется проверка наличия новых сообщений. Если в очереди есть сообщение — оно выводится в консоль.

Теперь можно создать Далее создадим файл отправителя, который будет продуцировать сообщения в очередь:

sudo nano producerPS.py

Его содержимое должно быть таким:

import redis

# аналогичное подключение к удаленному серверу Redis
connection = redis.Redis(
	host="IP",
	password="PASSWORD",
	port=6379,
	db=0,
	decode_responses=True
)

connection.publish('channelFirst', 'Данное сообщение было отправлено в первый канал') # отправляем сообщение в первый канал
connection.publish('channelSecond', 'Данное сообщение было отправлено во второй канал') # отправляем сообщение во второй канал

Сперва выполняется подключение к удаленному серверу Redis, аналогичное файлу subscriberPS.py. Далее по открытому соединению отправляется два сообщения — одно в первый канал, другое во второй.

Теперь можно последовательно запустить написанные скрипты и убедиться в работоспособности примера.

Сначала запускаем обработчик сообщений в уже открытом терминале консоли:

python consumerPS.py

Далее открываем второй консольный терминал и выполняем активацию среды:

source ./venv/bin/activate

После чего запускаем отправителя сообщений:

python producerPS.py

Соответственно в первом терминале появится следующий вывод:

Данное сообщение было отправлено в первый канал

Данное сообщение было отправлено во второй канал

Очередь на основе List

Теперь мы реализуем похожую очередь, но с использованием сущности List.

Сперва создадим файл обработчика:

sudo nano consumerList.py

И напишем в него следующий код:

import redis
import random
import time

connection = redis.Redis(
	host="IP",
	password="PASSWORD",
	port=6379,
	db=0,
	decode_responses=True
)

len = connection.llen("listQueue") # получаем размер листа очереди сообщений

# читаем сообщения из листа до тех пор, пока размер листа не станет равен нулю
while connection.llen("listQueue") != 0:
	msg = connection.rpop("listQueue") # читаем сообщение, которое является типом данных словаря
	if msg:
		print(msg) # выводим сообщение в консоль

Обратите внимание, что в этом примере мы извлекаем и удаляем сообщение «справа», а не «слева». То есть вместо функции lpop используется rpop.

Теперь реализуем файл отправителя:

sudo nano producerList.py

Содержимое будет такое:

import redis
import random

connection = redis.Redis(
	host="IP",
	password="PASSWORD",
	port=6379,
	db=0,
	decode_responses=True
)

# отправляем сразу 3 сообщения
for i in range(0,3):
	connection.lpush("listQueue", "Сообщение №" + str(random.randint(0, 100))) # добавляем в лист сообщение с уникальным номером

Важно отметить, что сообщения добавляются в лист справа налево.

Например, было отправлено 3 сообщения:

  • Сообщение №1

  • Сообщение №2

  • Сообщение №3

После этого лист очереди будет выглядеть так:

[ Сообщение №3, Сообщение №2, Сообщение №1 ]

Соответственно, если в коде обработчика сообщений используется функция rpop, то сообщения будут обрабатываться в порядке их отправки. Если же lpop — в порядке, обратном их отправке.

Тоже самое справедливо и для отправки сообщений с помощью функции rpush вместо функции lpush.

Запустим скрипт отправителя для заполнения листа очереди сообщений:

python producerList.py

После этого выполним обработку сообщений:

python consumerList.py

В консоли должен появится примерно такой вывод — отличаться будут только номера сообщений:

Сообщение №94
Сообщение №96
Сообщение №24

Очередь на основе Stream

Еще один полезный инструмент для реализации очереди — «Потоки».

Для управления потоками существует несколько базовых команд:

  • XADD. Добавляет новую запись в поток.

  • XREAD. Читает одну или несколько записей, начиная с заданной позиции и продвигаясь вперед во времени.

  • XRANGE. Возвращает диапазон записей между двумя предоставленными идентификаторами записей.

  • XLEN. Возвращает длину потока.

Создадим файл отправителя сообщений:

sudo nano producerStream.py

Код внутри следующий:

import redis
import random

connection = redis.Redis(
	host="IP",
	password="PASSWORD",
	port=6379,
	db=0,
	decode_responses=True
)

# отправляем сразу 3 сообщения
for i in range(0,3):
	connection.xadd("queueStream", { "data":"Сообщение №" + str(random.randint(0, 100))}) # добавляем в очередь сообщение с уникальным номером (тип словаря)

print("Длина очереди: " + str(connection.xlen("queueStream"))) # выводим в консоль размер очереди сообщений

В этом примере мы отправляем сразу 3 сообщения с уникальным номером в поток через цикл for. После отправки в терминал консоли выводится размер потока.

Теперь реализуем функционал обработчика сообщений:

sudo nano consumerStream.py

Код внутри файла следующий:

import redis
import random

connection = redis.Redis(
	host="IP",
	password="PASSWORD",
	port=6379,
	db=0,
	decode_responses=True
)

len = connection.xlen("queueStream") # узнаем длину потока

if len > 0:
	messages = connection.xread(count=len, streams={"queueStream":0}) # получаем весь список сообщений в потоке

	# перебираем список сообщений
	for msg in messages:
		print(msg) # выводим сообщение в консоль

Сперва мы целиком извлекаем очередь сообщений из потока, после чего последовательно обрабатываем ее в цикле for.

В аналогичном порядке запускаем написанные скрипты. Начинаем с отправителя сообщений:

python producerStream.py

В консоли должен появиться следующий вывод:

Длина очереди: 3

После чего выполняем обработку очереди сообщений:

python consumerStream.py

Соответственно, вывод в консоли будет примерно таким:

['queueStream', [('1711712995031-0', {'data': 'Сообщение №74'}), ('1711712995033-0', {'data': 'Сообщение №54'})]]

По этому выводу можно заметить, что у каждого сообщения есть свой уникальный идентификатор, автоматически присвоенный Redis.

Однако у этого примера есть один недостаток — каждый раз мы читаем один и тот же поток целиком.

Давайте усложним код в файле consumerStream.py, чтобы каждый новый запуск скрипта обработчика читал только новые сообщения потока:

import redis
import random

connection = redis.Redis(
	host="IP",
	password="PASSWORD",
	port=6379,
	db=0,
	decode_responses=True
)

# создаем переменную Redis для хранения ID последнего сообщения (если эта переменная еще не существует)
if connection.get("last") == None:
	connection.set("last", 0)

len = connection.xlen("queueStream") # узнаем длину потока

if len > 0:
	messages = connection.xread(count=len, block=1000, streams={"queueStream":connection.get("last")}) # в момент чтения передаем ID последнего сообщения в качестве аргумента (либо 0)

	print(connection.get("last")) # выводим в консоль ID последнего сообщения (либо 0)

	# перебираем список новых сообщений
	for msg in messages:
		print(msg) # выводим сообщение в консоль
		connection.set("last", msg[-1][-1][0]) # устанавливаем в качестве значения переменной ID последнего прочитанного сообщения

Теперь каждый новый запрос к Redis будет выводить в консоль только свежие сообщения.

Работа с потоками Redis выглядит немного сложнее, чем с листами или подписчиками. Поэтому для полноценного понимания потоков при интеграции такого типа очередей в свой проект лучше всего подробно ознакомится с официальными примерами от Redis.

Заключение

В этом руководстве было продемонстрировано несколько базовых способов создания очереди в Redis: Pub/Sub, List и Stream.

Показанные примеры — лишь минимальные реализации, выполняющие логику очереди сообщений. Реальный проект потребует усложнения этой логики, чтобы удовлетворять критериям разработчика и решать поставленные задачи.

Например, функционал очереди сообщений может быть обернут в классы и объекты, либо выполнен в виде отдельной внутренней библиотеки проекта.

Поэтому в каждом конкретном проекте потребуется дальнейшее уникальное развитие этой реализации, чтобы решать задачи

Более подробно ознакомится с командами Redis, которые предназначены для работы с разными инструментами очереди сообщений, можно в специальных разделах официальной документации Redis:

 

Зарегистрируйтесь и начните пользоваться
сервисами Timeweb Cloud прямо сейчас

15 лет опыта
Сосредоточьтесь на своей работе: об остальном позаботимся мы
165 000 клиентов
Нам доверяют частные лица и компании, от небольших фирм до корпораций
Поддержка 24/7
100+ специалистов поддержки, готовых помочь в чате, тикете и по телефону