Пробую rabbitmq | Блог python программиста
Изображение гика

Блог питониста

Пробую rabbitmq

12 октября 2024 г.

В этом посте хотел попробовать такую технологию, как rabbitmq, это брокер сообщений, который работает на протоколе AMQP и служит для передачи сообщений между, например, разными микросервисами, или между разными частями программы.

Иногда при разработке возникает проблема передачи данных или сигналов между разными сервисами, или, например, вы хотите отправить сигнал какому-то "воркеру", чтобы он выполнил какой-то скрипт.

Эти проблемы можно решить с помощью обычных HTTP запросов, но иногда этот вариант по разным причинам не подходит. И здесь можно посмотреть в сторону такой технологии, как rabbitmq. В ней есть несколько сущностей, которые я затрону в этом посте: так называемые exchange, queue и routing_key.

Exchage - это некая абстракция, к которой могут относиться несколько разных очередей - queue. И есть так называемые routing_key, которые нужны, чтобы Exchage "понял" в какую очередь отправить сообщение.

Нам понадобится 2 питоновских файла - producer.py и consumer.py. Первый будет слать сообщения в очередь, а второй - читать их. Но сначала, нужно поставить и поднять собственно сам rabbitmq локально, удобнее всего, мне кажется, сделать это с помощью docker:

sudo docker run -d -p 5672:5672 rabbitmq

В этой команде флаг -d означает запуск в фоновом режиме, флаг -p отвечает за "проброс" порта, а rabbitmq - это название нужного нам контейнера. После этого, если все нормально, контейнер поднимется и можно проверить, что все ок:

sudo docker ps -a

Теперь, у нас есть rabbitmq локально, и мы можем начать писать код, сначала, поставим библиотеку pika, она нам понадобится для взаимодействия с нашим локально поднятым кроликом:

pip install pika

Создадим папку и файлы:

mkdir rabbit_test

cd rabbit_test

touch consumer.py producer.py

Теперь, напишем наш consumer в файле consumer.py:

 1 import pika
 2 
 3 
 4 def on_message(channel, method_frame, header_frame, body):
 5     print(method_frame.delivery_tag)
 6     print(body)
 7     channel.basic_ack(delivery_tag=method_frame.delivery_tag)
 8 
 9 
10 parameters = pika.connection.URLParameters("amqp://guest:guest@localhost:5672/%2F")
11 connection = pika.BlockingConnection(parameters)
12 
13 # Open the channel
14 channel = connection.channel()
15 
16 # Declare the exchange
17 exchange_name = "test_exchange"
18 channel.exchange_declare(exchange_name)
19 
20 # Declare the queue
21 queue_name = "test_queue"
22 channel.queue_declare(
23     queue=queue_name, durable=True, exclusive=False, auto_delete=False
24 )
25 
26 # Bind of queue to exchange
27 routing_key = "test_routing_key"
28 channel.queue_bind(queue_name, exchange_name, routing_key)
29 
30 channel.basic_consume(queue_name, on_message)
31 
32 
33 if __name__ == "__main__":
34     print("Start of consumer")
35     try:
36         channel.start_consuming()
37     except KeyboardInterrupt:
38         channel.stop_consuming()
39     connection.close()

producer.py:

 1 import pika
 2 
 3 
 4 # Open a connection to RabbitMQ on localhost
 5 parameters = pika.connection.URLParameters("amqp://guest:guest@localhost:5672/%2F")
 6 connection = pika.BlockingConnection(parameters)
 7 
 8 # Open the channel
 9 channel = connection.channel()
10 
11 # Turn on delivery confirmations
12 channel.confirm_delivery()
13 
14 exchange_name = "test_exchange"
15 routing_key = "test_routing_key"
16 
17 if __name__ == "__main__":
18     print("Start of producer")
19     # Send a message
20     try:
21         channel.basic_publish(
22             exchange=exchange_name,
23             routing_key=routing_key,
24             body="Hello World!",
25             properties=pika.BasicProperties(
26                 content_type="text/plain",
27                 delivery_mode=pika.DeliveryMode.Transient,
28             ),
29         )
30         print("Message publish was confirmed")
31     except pika.exceptions.UnroutableError:
32         print("Message could not be confirmed")

Сначала поднимем consumer, чтобы создать exchange и queue:

python consumer.py

Start of consumer

Если consumer поднялся без ошибок, теперь отправим сообщение в очередь:

python producer.py

Start of producer
Message publish was confirmed

В консоли consumer'a видим:

1
b'Hello World!'

Вроде все получилось, теперь можно остановить и удалить контейнер с rabbitmq:

sudo docker ps -a

sudo docker stop <cont_id>

sudo docker container rm <cont_id>

sudo docker container prune

Метки

python
Если вам понравился пост, можете поделиться им в соцсетях: