Apache Kafka является сложным инфраструктурным компонентом, который порой играет ключевую роль в микросервисной архитектуре. Обеспечение безопасности Apache Kafka — это не просто опция, а необходимость, особенно если происходит взаимодействие с конфиденциальными данными, такими как персональная информация или финансовые транзакции. Основные угрозы включают несанкционированный доступ, перехват сообщений и атаки на инфраструктуру. Сегодня мы рассмотрим практические шаги, как обезопасить кластер Kafka.
Предварительные требования
Так как мы будем рассматривать практическую часть, нам понадобится следующее:
-
Один облачный (виртуальный) сервер с предустановленным дистрибутивом Linux Ubuntu 22.04.
-
Установленный Apache Kafka. Инструкция по установке Kafka приведена в разделе «Установка и запуск Apache Kafka» в статье «Мониторинг Apache Kafka».
В качестве минимальных характеристик облачного сервера необходимо выбрать конфигурацию, содержащую двуядерный процессор и 4 ГБ оперативной памяти. Дополнительным требованием является аренда публичного IPv4 IP-адреса, который можно арендовать в разделе «Сеть».
После заказа облачного сервера он будет готов к использованию в течение пары минут. IPv4 адрес, логин и пароль для подключения по протоколу SSH будут сгенерированы и доступны в разделе «Дашборд».
cloud
Базовые настройки безопасности
Прежде чем приступать к более серьезным настройкам безопасности, затронем базовые моменты.
Обновление Kafka
Команда разработки Kafka стабильно выпускает три релиза в год. В новых версиях исправляются ошибки и устраняются проблемы, связанные с безопасностью. За выходом новых версий можно следить на официальном сайте. Там же доступны ссылки для скачивания новых версий Kafka.
Использование firewall
Как правило, Kafka разворачивается в частной сети без доступа извне. Однако если по каким-то причинам доступ до кластера возможен из публичной сети или необходимо ограничить доступ до кластера во внутренней сети, достаточно закрыть порты, которые использует Kafka. По умолчанию Kafka использует следующие порты:
Порт |
Наименование |
9092 |
Порт для клиентских подключений |
9093 |
Порт для защищенных подключений (при условии, что соединение было настроено заранее по TLS протоколу) |
9094 |
Порт для SASL-аутентификации (при условии, что соединение было настроено заранее) |
2181 |
Порт используемый ZooKeeper (если Kafka использует ZooKeeper) |
Чтобы закрыть доступ к портам Kafka, можно воспользоваться утилитой iptables. Ниже приведены практические примеры. В них мы используем подсеть 192.168.1.0/24
— у вас она будет отличаться. Чтобы узнать свой внешний IP-адрес, можно воспользоваться, например, сервисом 2ip.ru, выполнив в терминале:
curl 2ip.ru
Разрешить доступ из подсети 192.168.1.0/24:
iptables -A INPUT -s 192.168.1.0/24 -p tcp -m multiport --dports 9092,9093,9094,2181 -j ACCEPT
Заблокировать доступ из внешней сети:
iptables -A INPUT -i eth0 -p tcp -m multiport --dports 9092,9093,9094,2181 -j DROP
Не забудьте сохранить правила при помощи команды:
iptables-save > /etc/iptables/rules.v4
Также убедитесь, что в системе установлен пакет iptables-persistent
, чтобы правила применялись после перезагрузки. Для установки пакета iptables-persistent
в разных дистрибутивах Linux необходимо выполнить команды ниже.
Для установки в Debian-based-дистрибутивах (Ubuntu, Debian, Linux Mint и т.д.):
apt update && apt -y install iptables-persistent
Для установки в RPM-дистрибутивах (CentOS, Fedora, RHEL и т.д.):
Для менеджера пакетов yum
:
yum install iptables-services
Для менеджера пакетов dnf
:
dnf install iptables-services
При использовании UFW (Uncomplicated Firewall) стоит обратить внимание, что по умолчанию UFW блокирует все входящие соединения. Для того чтобы открыть доступ до необходимых портов, необходимо выполнить команды ниже.
Открыть сетевой доступ до портов 9092, 9093, 9094, 2181 из подсети 192.168.1.0/24:
ufw allow from 192.168.1.0/24 to any port 9092
ufw allow from 192.168.1.0/24 to any port 9093
ufw allow from 192.168.1.0/24 to any port 9094
ufw allow from 192.168.1.0/24 to any port 2181
Создание пользователя Kafka
Убедитесь, что при установке для всех директорий и файлов, используемых Kafka, был задан отдельный пользователь. Для проверки достаточно выполнить команду ls -l
, чтобы отобразить имя пользователя (предполагается, что Kafka установлен в директорию /opt/kafka
, при необходимости замените директорию установки на нужную):
ls -l /opt/kafka
Если задан другой пользователь, то необходимо создать пользователя kafka
. Для этого выполните команду:
useradd -r -m -s /bin/false kafka
Далее выдаем пользователю kafka
права на директорию установки:
chown -R kafka:kafka /opt/kafka
Расширенные настройки безопасности
В качестве расширенных настроек безопасности мы рассмотрим настройку шифрования TLS и настройку аутентификации.
Настройка шифрования TLS
TLS в Apache Kafka используется для обеспечения шифрования и безопасности передачи данных между клиентами и брокерами. Для настройки TLS необходимо выполнить шаги, перечисленные ниже.
- Переходим в домашнюю директорию пользователя
kafka
, которого мы создали ранее:
cd /home/kafka
Создаем новую директорию с именем kafka-ssl
, где будут храниться SSL-сертификаты, и переходим в нее:
mkdir -p kafka-ssl && cd kafka-ssl
- Создаем сертификат и его закрытый ключ:
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 -nodes -subj "/C=RU/ST=Moscow/L=Moscow/O=Org/OU=Unit/CN=localhost"
На выходе у нас будут два файла:
-
-
ca-key
— файл с закрытым ключом (private key); -
ca-cert
— файл с открытым ключом (public key).
-
- Каждому брокеру Kafka требуется свой собственный сертификат, подписанный центром сертификации. Создадим запрос на выдачу сертификата для брокера:
openssl req -new -newkey rsa:2048 -keyout broker-key.pem -out broker-csr.pem -nodes -subj "/C=RU/ST=Moscow/L=Moscow/O=Org/OU=Unit/CN=localhost"
- Подпишем его в центре сертификации:
openssl x509 -req -CA ca-cert -CAkey ca-key -in broker-csr.pem -out broker-cert.pem -days 365 -CAcreateserial
- Конвертируем выпущенный сертификат для брокера и его закрытый ключ в двоичный формат PKCS12:
openssl pkcs12 -export -in broker-cert.pem -inkey broker-key.pem -out broker.p12 -name kafka-broker -passout pass:StrongPassword12345
Вместо StrongPassword12345
необходимо задать свой пароль для цепочки сертификатов.
На выходе получим файл с именем broker.p12
.
- Импортируем файл с сертификатом в формате PKCS12 в хранилище ключей Java (Java Keystore):
keytool -importkeystore -srckeystore broker.p12 -srcstoretype PKCS12 -destkeystore kafka.broker.keystore.jks -deststoretype JKS -srcstorepass StrongPassword12345 -deststorepass StrongPassword12345
Параметры srcstorepass
и deststorepass
используются для указания паролей для исходного и целевого хранилищ ключей (keystore). Вместо StrongPassword12345
необходимо задать свои пароли.
При успешном импорте в терминале появится фраза:
Entry for alias kafka-broker successfully imported.
Import command completed: 1 entries successfully imported, 0 entries failed or cancelled
- Импортируем сертификат удостоверяющего центра в хранилище ключей keystore:
keytool -import -file ca-cert -keystore kafka.broker.keystore.jks -storepass StrongPassword12345 -noprompt
Вместо StrongPassword12345
необходимо задать свой пароль для доступа к целевому хранилищу ключей.
- Создаем TrustStore (специальное хранилище ключей, которое используется для хранения доверенных сертификатов) и импортируем сертификат удостоверяющего центра:
keytool -import -file ca-cert -keystore kafka.truststore.jks -storepass StrongPassword12345 -noprompt
Вместо StrongPassword12345
необходимо использовать пароль, который мы задали на предыдущем шаге.
- После того как все сертификаты были созданы, необходимо назначить владельца
kafka
, которого мы создали ранее, на директориюkafka-ssl
и на все файлы внутри нее:
chown -R kafka:kafka /home/kafka/kafka-ssl
- При помощи любого текстового редактора открываем основной конфигурационный файл сервера Kafka —
server.properties
:
nano /opt/kafka/config/server.properties
Добавляем в самый конец файла строки:
listeners=SSL://localhost:9093
advertised.listeners=SSL://localhost:9093
security.inter.broker.protocol=SSL
ssl.keystore.location=/home/kafka/kafka-ssl/kafka.broker.keystore.jks
ssl.keystore.password=StrongPassword12345
ssl.key.password=StrongPassword12345
ssl.truststore.location=/home/kafka/kafka-ssl/kafka.truststore.jks
ssl.truststore.password=StrongPassword12345
ssl.enabled.protocols=TLSv1.2,TLSv1.3
ssl.keystore.type=JKS
ssl.truststore.type=JKS
Замените значения следующих параметров на актуальные для вас:
ssl.keystore.location
— полный путь до файлаkeystore.jks
;ssl.keystore.password
— пароль, который был задан на этапе генерации файлаkeystore.jks
;ssl.key.password
— пароль, который был задан на этапе конвертации сертификата и закрытого ключа в двоичный формат PKCS12;ssl.truststore.location
— полный путь до файлаkafka.truststore.jks
;ssl.truststore.password
— пароль, который был задан во время создания TrustStore.
- Далее создаем новый файл, в котором будут храниться настройки клиента:
nano /opt/kafka/config/client.properties
Используем следующие настройки:
security.protocol=SSL
ssl.truststore.location=/home/kafka/kafka-ssl/kafka.truststore.jks
ssl.truststore.password=StrongPassword12345
ssl.enabled.protocols=TLSv1.2,TLSv1.3
ssl.truststore.type=JKS
Замените значения следующих параметров на свои:
-
ssl.truststore.location
— полный путь до файлаkafka.truststore.jks
;ssl.truststore.password
— пароль, который был задан во время создания TrustStore.
- После того как все необходимые настройки были внесены, перезапускаем Kafka и проверяем его статус:
systemctl restart kafka && systemctl status kafka
- Протестируем готовую конфигурацию. Для этого создадим новый топик с именем
new-topic-tls
:
/opt/kafka/bin/kafka-topics.sh --create --topic new-topic-tls --bootstrap-server localhost:9093 --command-config /opt/kafka/config/client.properties --partitions 1 --replication-factor 1
Обратите внимание, что подключение к брокеру Kafka осуществляется по новому порту — 9093. Также мы указали полный путь до ранее созданного клиентского файла client.properties
, в котором прописаны настройки для использования TLS.
Если в конфигурационных файлах нет ошибок, топик будет успешно создан. Чтобы убедиться в этом, воспользуемся командой для вывода всех топиков в брокере:
/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9093 --command-config /opt/kafka/config/client.properties
- Также проверим работоспособность производителя (Producer) и потребителя (Consumer). Для этого сначала запускаем производителя:
/opt/kafka/bin/kafka-console-producer.sh --topic new-topic-tls --bootstrap-server localhost:9093 --producer.config /opt/kafka/config/client.properties
Вводим любое сообщение, например Test TLS in Kafka
.
- Далее открываем новую сессию SSH, не закрывая текущую, и запускаем потребителя:
/opt/kafka/bin/kafka-console-consumer.sh --topic new-topic-tls --bootstrap-server localhost:9093 --consumer.config /opt/kafka/config/client.properties --from-beginning
Ранее отправленное сообщение было успешно получено.
- Также дополнительно можно убедиться, что весь трафик идет по TLS. Для этого в основном лог-файле сервера
server.log
выведем строки, содержащие слово SSL:
cat /opt/kafka/logs/server.log | grep SSL
Настройка аутентификации при помощи SASL
Существует еще один способ защиты Kafka, который заключается в использовании аутентификации. В этом случае в конфигурации Kafka заранее задаются пользователи, которые могут получить доступ до Kafka.
SASL (Simple Authentication and Security Layer) — это фреймворк для обеспечения аутентификации и защиты данных в сетевых протоколах. Он используется для добавления механизмов аутентификации (например, через логин/пароль, токены или Kerberos) в протоколы, такие как SMTP, IMAP, XMPP и другие.
- Создаем новый конфигурационный файл с именем
kafka_server_jaas.conf
, в котором будут храниться настройки SASL:
nano /opt/kafka/config/kafka_server_jaas.conf
Используем следующее содержимое:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_client1="client1-secret";
};
- Меняем владельца и группу созданного файла на пользователя
kafka
:
chown kafka:kafka /opt/kafka/config/kafka_server_jaas.conf
- Далее необходимо добавить файл
kafka_server_jaas.conf
в автозапускsystemd
-юнита Kafka. Для этого открываем на редактирование файлkafka.service
:
nano /etc/systemd/system/kafka.service
В блок [Service]
добавляем строку:
Environment="KAFKA_OPTS=-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
Сохраняем изменения и выходим из файла.
- Перезапускаем демон
systemd
:
systemctl daemon-reload
- Открываем на редактирование основной файл сервера Kafka:
nano /opt/kafka/config/server.properties
Находим параметры ниже и приводим их к следующему виду:
listeners=SASL_SSL://localhost:9093
advertised.listeners=SASL_SSL://localhost:9093
security.inter.broker.protocol=SASL_SSL
ssl.client.auth=required
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
Итоговый вид конфигурации, включающий в себя также параметры для TLS (из предыдущего раздела), выглядит следующим образом:
Сохраняем изменения и выходим из файла.
- Создаем файл для клиентской части SASL:
nano /opt/kafka/config/kafka_client_jaas.conf
Используем содержимое ниже:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="client1"
password="client1-secret";
};
Сохраняем изменения и выходим из файла.
- Меняем владельца и группу созданного файла на пользователя
kafka
:
chown kafka:kafka /opt/kafka/config/kafka_client_jaas.conf
- Далее создаем клиентский файл
client-sasl.properties
:
nano /opt/kafka/config/client-sasl.properties
Со следующим содержимым:
bootstrap.servers=localhost:9093
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="client1" \
password="client1-secret";
ssl.truststore.location=/home/kafka/kafka-ssl/kafka.truststore.jks
ssl.truststore.password=StrongPassword12345
Сохраняем изменения и выходим из файла.
- Перезапускаем Kafka:
systemctl restart kafka
- Протестируем готовую конфигурацию. Для этого создадим новый топик с именем
sasl-topic
:
/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9093 --command-config /opt/kafka/config/client-sasl.properties --create --topic sasl-topic --partitions 1 --replication-factor 1
Выведем список всех топиков:
/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9093 --command-config /opt/kafka/config/client-sasl.properties
- Также проверим работоспособность производителя (Producer) и потребителя (Consumer). Для этого сначала запускаем производителя:
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9093 --topic sasl-topic --producer.config /opt/kafka/config/client-sasl.properties
Вводим любое сообщение, например Kafka test with SASL
.
- Далее открываем новую сессию SSH, не закрывая текущую, и запускаем потребителя:
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic sasl-topic --consumer.config /opt/kafka/config/client-sasl.properties --from-beginning
Ранее отправленное сообщение было получено.
Разверните Kafka на своем облачном сервере
Заключение
Сегодня мы рассмотрели способы защиты инфраструктурного компонента Apache Kafka. Защита Apache Kafka является критически важным аспектом обеспечения безопасности данных в распределенных системах. Эффективная защита требует комплексного подхода, включающего настройку шифрования (SSL/TLS), аутентификацию (SASL), а также регулярный мониторинг и обновление системы. Эти меры минимизируют риски несанкционированного доступа, утечек данных и атак, обеспечивая надежность и конфиденциальность при передаче и обработке сообщений в Kafka.