RPC¶
Информация
Предварительные условия
В этом руководстве предполагается, что RabbitMQ установлен и работает на localhost на стандартном порту (5672). Если вы используете другой хост, порт или учетные данные, настройки подключения потребуют корректировки.
Где получить помощь
Если у вас возникли проблемы при прохождении этого руководства, вы можете связаться с нами через GitHub Discussions или RabbitMQ community Discord.
Во втором уроке мы узнали, как использовать Work Queues для распределения трудоемких задач между несколькими работниками.
Но что, если нам нужно запустить функцию на удаленном компьютере и дождаться результата? Ну, это уже совсем другая история. Этот паттерн широко известен как Remote Procedure Call или RPC.
В этом уроке мы будем использовать RabbitMQ для построения системы RPC: клиента и масштабируемого сервера RPC. Поскольку у нас нет трудоемких задач, которые стоило бы распределять, мы создадим фиктивный сервис RPC, который возвращает числа Фибоначчи.
Примечание по поводу RPC
Хотя RPC является довольно распространенным паттерном в вычислениях, его часто критикуют. Проблемы возникают, когда программист не знает, является ли вызов функции локальным или это медленный RPC. Подобные путаницы приводят к непредсказуемости системы и добавляют ненужную сложность к отладке. Вместо упрощения программного обеспечения, неправильное использование RPC может привести к появлению неуправляемого спагетти-кода.
Имея это в виду, примите во внимание следующие советы:
- Убедитесь, что очевидно, какой вызов функции является локальным, а какой — удаленным.
- Документируйте свою систему. Четко обозначьте зависимости между компонентами.
- Обрабатывайте ошибки. Как должен реагировать клиент, когда сервер RPC долго не работает?
В случае сомнений избегайте RPC. Если возможно, используйте асинхронный конвейер — вместо блокировки, подобной RPC, результаты асинхронно передаются на следующий этап вычислений.
Очередь обратных вызовов¶
В целом, выполнение RPC через RabbitMQ не представляет сложности. Клиент отправляет запрос, а сервер отвечает сообщением. Чтобы получить ответ, необходимо отправить адрес очереди обратных вызовов вместе с запросом. Давайте попробуем:
1 2 3 4 5 6 7 8 9 10 |
|
Свойства сообщения
Протокол AMQP 0-9-1 предопределяет набор из 14 свойств, которые сопровождают сообщение. Большинство свойств используются редко, за исключением следующих:
persistent
: помечает сообщение как постоянное (со значениемtrue
) или временное (false
). Вы можете помнить это свойство из второго учебника .content_type
: используется для описания MIME-типа кодировки. Например, для часто используемой кодировки JSON рекомендуется установить для этого свойства значение:application/json
.reply_to
: обычно используется для именования очереди обратных вызовов.correlation_id
: полезно для сопоставления ответов RPC с запросами.
Идентификатор корреляции¶
Создание очереди обратных вызовов для каждого запроса RPC неэффективно. Лучше создать одну очередь обратных вызовов для каждого клиента.
Это поднимает новую проблему: после получения ответа в этой очереди неясно, к какому запросу он относится. Именно в этом случае используется свойство correlation_id
. Мы установим для него уникальное значение для каждого запроса. Позже, когда мы получим сообщение в очереди обратных вызовов, мы посмотрим на это свойство и на его основе сможем сопоставить ответ с запросом. Если мы увидим неизвестное значение correlation_id
, мы можем безопасно отбросить сообщение — оно не относится к нашим запросам.
Вы можете спросить, почему мы должны игнорировать неизвестные сообщения в очереди обратных вызовов, а не выдавать ошибку? Это связано с возможностью возникновения условия гонки на стороне сервера. Хотя это маловероятно, но возможно, что RPC-сервер завершит работу сразу после отправки нам ответа, но до отправки подтверждающего сообщения для запроса. Если это произойдет, перезапущенный RPC-сервер обработает запрос снова. Вот почему на клиенте мы должны корректно обрабатывать дубликаты ответов, а RPC в идеале должен быть идемпотентным.
Резюме¶
flowchart LR
C((Client))
S((Server))
Q1[[rpc_queue]]
Q2[[amq.gen-Xa2…]]
Request["`Request
reply_to=amq.gen-Xa2…
correlation_id=abc`"]
Reply["`Reply
correlation_id=abc`"]
C --- Request --> Q1 --> S --> Q2 --- Reply --> C
class C mermaid-producer
class Q1 mermaid-queue
class Q2 mermaid-queue
class S mermaid-consumer
class Request mermaid-msg
class Reply mermaid-msg
Наш RPC будет работать следующим образом:
- При запуске клиент создает эксклюзивную очередь обратных вызовов.
- Для RPC-запроса клиент отправляет сообщение с двумя свойствами:
reply_to
, которое устанавливается в очередь обратных вызовов, иcorrelation_id
, которое устанавливается в уникальное значение для каждого запроса. - Запрос отправляется в очередь
rpc_queue
. - Рабочий процесс RPC (он же сервер) ожидает запросов в этой очереди. Когда появляется запрос, он выполняет задачу и отправляет сообщение с результатом обратно клиенту, используя очередь из поля
reply_to
. - Клиент ожидает данных в очереди обратных вызовов. Когда появляется сообщение, он проверяет свойство
correlation_id
. Если оно совпадает со значением из запроса, он возвращает ответ приложению.
Сводка¶
Функция Фибоначчи:
1 2 3 4 |
|
Мы объявляем нашу функцию Фибоначчи. Она принимает только действительные положительные целые числа в качестве входных данных. (Не ожидайте, что она будет работать с большими числами, и, вероятно, это самая медленная рекурсивная реализация из возможных).
Код для нашего RPC-сервера rpc_server.js
выглядит следующим образом:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
|
Код сервера довольно прост:
- Как обычно, мы начинаем с установления соединения, канала и объявления очереди.
- Возможно, мы захотим запустить более одного серверного процесса. Чтобы равномерно распределить нагрузку между несколькими серверами, нам нужно установить настройку
prefetch
на канале. - Мы используем
Channel.consume
для потребления сообщений из очереди. Затем мы входим в функцию обратного вызова, где выполняем работу и отправляем ответ обратно.
Код для нашего RPC-клиента rpc_client.js
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
|
Сейчас самое время взглянуть на полный пример исходного кода для rpc_client.js
и rpc_server.js
.
Наш RPC-сервис готов. Можно запускать сервер:
1 2 |
|
Чтобы запросить число Фибоначчи, запустите клиент:
1 2 |
|
Представленный здесь дизайн не является единственной возможной реализацией службы RPC, но он имеет ряд важных преимуществ:
- Если сервер RPC работает слишком медленно, вы можете увеличить его производительность, просто запустив еще один. Попробуйте запустить второй
rpc_server.js
в новой консоли. - На стороне клиента RPC требует отправки и получения только одного сообщения. В результате RPC-клиенту требуется только один сетевой цикл для одного RPC-запроса.
Наш код по-прежнему довольно прост и не пытается решать более сложные (но важные) проблемы, такие как:
- Как должен реагировать клиент, если нет работающих серверов?
- Должен ли клиент иметь какой-то тайм-аут для RPC?
- Если сервер неисправен и выдает исключение, следует ли его пересылать клиенту?
- Защита от недействительных входящих сообщений (например, проверка границ, типа) перед обработкой.
Если вы хотите поэкспериментировать, вам может пригодиться интерфейс управления для просмотра очередей.
Источник — https://www.rabbitmq.com/tutorials/tutorial-six-javascript