Публикация и подписка¶
Информация
Предварительные условия
В этом руководстве предполагается, что RabbitMQ установлен и работает на localhost на стандартном порту (5672). Если вы используете другой хост, порт или учетные данные, настройки подключения потребуют корректировки.
Где получить помощь
Если у вас возникли проблемы при прохождении этого руководства, вы можете связаться с нами через GitHub Discussions или RabbitMQ community Discord.
В предыдущем уроке мы создали рабочую очередь. Рабочая очередь основана на предположении, что каждая задача доставляется ровно одному воркеру. В этой части мы сделаем что-то совершенно другое — мы будем доставлять сообщение нескольким потребителям. Этот паттерн известен как «публикация и подписка».
Чтобы проиллюстрировать этот паттерн, мы создадим простую систему регистрации. Она будет состоять из двух программ — первая будет генерировать сообщения журнала, а вторая — принимать и печатать их.
В нашей системе регистрации каждая запущенная копия программы-приемника будет получать сообщения. Таким образом, мы сможем запустить один приемник и направлять журналы на диск, а одновременно с этим запустить другой приемник и просматривать журналы на экране.
По сути, опубликованные сообщения журнала будут транслироваться всем приемникам.
Обменники¶
В предыдущих частях учебника мы отправляли и получали сообщения в очередь и из очереди. Теперь пришло время представить полную модель обмена сообщениями в Rabbit.
Давайте быстро пройдемся по тому, что мы рассмотрели в предыдущих учебниках:
- Производитель — это пользовательское приложение, которое отправляет сообщения.
- Очередь — это буфер, в котором хранятся сообщения.
- Потребитель — это пользовательское приложение, которое принимает сообщения.
Основная идея модели обмена сообщениями в RabbitMQ заключается в том, что производитель никогда не отправляет сообщения напрямую в очередь. На самом деле, довольно часто производитель даже не знает, будет ли сообщение доставлено в какую-либо очередь.
Вместо этого производитель может отправлять сообщения только в обменник. Обменник — это очень простая вещь. С одной стороны, он принимает сообщения от производителей, а с другой — отправляет их в очереди. Обменник должен точно знать, что делать с полученным сообщением. Должно ли оно быть добавлено в определенную очередь? Должно ли оно быть добавлено во многие очереди? Или же оно должно быть отброшено? Правила для этого определяются типом обменника.
flowchart LR
P((P))
X{{X}}
Q1[[Q₁]]
Q2[[Q₂]]
P --> X --> Q1 & Q2
class P mermaid-producer
class X mermaid-exchange
class Q1 mermaid-queue
class Q2 mermaid-queue
Доступно несколько типов обмена: direct
, topic
, headers
и fanout
. Мы сосредоточимся на последнем — fanout
. Давайте создадим обменник этого типа и назовем его logs
:
1 |
|
Обменник fanout
очень прост. Как можно догадаться из названия, он просто транслирует все полученные сообщения во все известные ему очереди. И это именно то, что нам нужно для нашего логгера.
Список обменников
Чтобы отобразить список обменников на сервере, можно запустить полезную команду rabbitmqctl
:
1 |
|
В этом списке будут некоторые обменники amq.*
и обменник по умолчанию (без имени). Они создаются по умолчанию, но в данный момент вам вряд ли понадобится их использовать.
Обменник по умолчанию
В предыдущих частях учебника мы ничего не знали об обменниках, но все равно могли отправлять сообщения в очереди. Это было возможно, потому что мы использовали обменник по умолчанию, который обозначается пустой строкой (""
).
Вспомните, как мы публиковали сообщение ранее:
1 |
|
Здесь мы используем обменник по умолчанию или без имени: сообщения направляются в очередь с именем, указанным в качестве первого параметра, если она существует.
Теперь мы можем вместо этого публиковать в нашем именованном обменнике:
1 |
|
Пустая строка в качестве второго параметра означает, что мы не хотим отправлять сообщение в какую-либо конкретную очередь. Мы хотим только опубликовать его в нашем обменнике «logs
».
Временные очереди¶
Как вы помните, ранее мы использовали очереди с конкретными именами (помните hello
и task_queue
?). Возможность называть очереди было для нас очень важно — нам нужно было направить рабочих к одной и той же очереди. Название очереди важно, когда вы хотите поделиться очередью между производителями и потребителями.
Но это не относится к нашему логгеру. Мы хотим получать все сообщения журнала, а не только их часть. Нас также интересуют только текущие сообщения, а не старые. Для решения этой задачи нам нужно две вещи.
Во-первых, при каждом подключении к Rabbit нам нужна новая пустая очередь. Для этого мы можем создать очередь с случайным именем или, что еще лучше, позволить серверу выбрать для нас случайное имя очереди.
Во-вторых, после отключения потребителя очередь должна быть автоматически удалена.
В клиенте amqp.node, когда мы указываем имя очереди в виде пустой строки, мы создаем недолговечную очередь с сгенерированным именем:
1 2 3 |
|
Когда метод возвращается, экземпляр очереди содержит случайное имя очереди, сгенерированное RabbitMQ. Например, оно может выглядеть как amq.gen-JzTY20BRgKO-HjmUJj0wLg
.
Когда соединение, которое его объявило, закрывается, очередь будет удалена, поскольку она объявлена как эксклюзивная. Вы можете узнать больше о флаге exclusive
и других свойствах очереди в руководстве по очередям.
Привязки¶
flowchart LR
P((P))
X{{X}}
Q1[[Q₁]]
Q2[[Q₂]]
P --> X -- binding --> Q1 & Q2
class P mermaid-producer
class X mermaid-exchange
class Q1 mermaid-queue
class Q2 mermaid-queue
Мы уже создали обменник fanout
и очередь. Теперь нам нужно сказать обменнику, чтобы он отправлял сообщения в нашу очередь. Эта связь между обменником и очередью называется привязкой.
1 |
|
Отныне обменник logs
будет добавлять сообщения в нашу очередь.
Список привязок
Вы можете посмотреть существующие привязки, используя команду:
1 |
|
Складывая все воедино¶
flowchart LR
P((P))
X{{X}}
Q1[[amq.gen-RQ6…]]
Q2[[amq.gen-As8…]]
C1((C₁))
C2((C₂))
P --> X
X --> Q1
X --> Q2
Q1 --> C1
Q2 --> C2
class P mermaid-producer
class X mermaid-exchange
class Q1 mermaid-queue
class Q2 mermaid-queue
class C1 mermaid-consumer
class C2 mermaid-consumer
Программа-производитель, которая выдает сообщения журнала, не сильно отличается от предыдущей. Самое важное изменение заключается в том, что теперь мы хотим публиковать сообщения в нашем обменнике logs
, а не в безымянном. При отправке нам нужно указать маршрутизационный ключ, но его значение игнорируется для обменников fanout
. Вот код для скрипта emit_log.js
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
|
Как видите, после установления соединения мы объявили обменник. Этот шаг необходим, так как публикация на несуществующем обменнике запрещена.
Сообщения будут потеряны, если к обменнику еще не привязана очередь, но для нас это не проблема; если еще нет потребителей, которые бы их прослушивали, мы можем безопасно удалить сообщение.
Код для receive_logs.js
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
|
Если вы хотите сохранить журналы в файл, просто откройте консоль и введите:
1 |
|
Если вы хотите увидеть журналы на экране, запустите новый терминал и выполните:
1 |
|
И, конечно же, для вывода журналов введите:
1 |
|
С помощью rabbitmqctl list_bindings
вы можете убедиться, что код действительно создает привязки и очереди, как мы хотим. При запуске двух программ receive_logs.js
вы должны увидеть примерно следующее:
1 2 3 4 5 |
|
Интерпретация результата проста: данные из журналов обмена поступают в две очереди с именами, назначенными сервером. И это именно то, что мы и хотели.
Чтобы узнать, как прослушивать подмножество сообщений, перейдем к уроку 4.
Источник — https://www.rabbitmq.com/tutorials/tutorial-three-javascript