Пробую rabbitmq
В этом посте хотел попробовать такую технологию, как rabbitmq
, это брокер сообщений, который работает на протоколе AMQP
и служит для передачи сообщений между, например, разными микросервисами, или между разными частями программы.
Иногда при разработке возникает проблема передачи данных или сигналов между разными сервисами, или, например, вы хотите отправить сигнал какому-то "воркеру", чтобы он выполнил какой-то скрипт.
Эти проблемы можно решить с помощью обычных HTTP
запросов, но иногда этот вариант по разным причинам не подходит. И здесь можно посмотреть в сторону такой технологии, как rabbitmq
. В ней есть несколько сущностей, которые я затрону в этом посте: так называемые exchange
, queue
и routing_key
.
Exchage
- это некая абстракция, к которой могут относиться несколько разных очередей - queue
. И есть так называемые routing_key
, которые нужны, чтобы Exchage
"понял" в какую очередь отправить сообщение.
Нам понадобится 2 питоновских файла - producer.py
и consumer.py
. Первый будет слать сообщения в очередь, а второй - читать их. Но сначала, нужно поставить и поднять собственно сам rabbitmq
локально, удобнее всего, мне кажется, сделать это с помощью docker
:
sudo docker run -d -p 5672:5672 rabbitmq
В этой команде флаг -d
означает запуск в фоновом режиме, флаг -p
отвечает за "проброс" порта, а rabbitmq
- это название нужного нам контейнера. После этого, если все нормально, контейнер поднимется и можно проверить, что все ок:
sudo docker ps -a
Теперь, у нас есть rabbitmq
локально, и мы можем начать писать код, сначала, поставим библиотеку pika, она нам понадобится для взаимодействия с нашим локально поднятым кроликом:
pip install pika
Создадим папку и файлы:
mkdir rabbit_test
cd rabbit_test
touch consumer.py producer.py
Теперь, напишем наш consumer
в файле consumer.py
:
1import pika 2 3 4def on_message(channel, method_frame, header_frame, body): 5 print(method_frame.delivery_tag) 6 print(body) 7 channel.basic_ack(delivery_tag=method_frame.delivery_tag) 8 9 10parameters = pika.connection.URLParameters("amqp://guest:guest@localhost:5672/%2F") 11connection = pika.BlockingConnection(parameters) 12 13# Open the channel 14channel = connection.channel() 15 16# Declare the exchange 17exchange_name = "test_exchange" 18channel.exchange_declare(exchange_name) 19 20# Declare the queue 21queue_name = "test_queue" 22channel.queue_declare( 23 queue=queue_name, durable=True, exclusive=False, auto_delete=False 24) 25 26# Bind of queue to exchange 27routing_key = "test_routing_key" 28channel.queue_bind(queue_name, exchange_name, routing_key) 29 30channel.basic_consume(queue_name, on_message) 31 32 33if __name__ == "__main__": 34 print("Start of consumer") 35 try: 36 channel.start_consuming() 37 except KeyboardInterrupt: 38 channel.stop_consuming() 39 connection.close()
producer.py
:
1import pika 2 3 4# Open a connection to RabbitMQ on localhost 5parameters = pika.connection.URLParameters("amqp://guest:guest@localhost:5672/%2F") 6connection = pika.BlockingConnection(parameters) 7 8# Open the channel 9channel = connection.channel() 10 11# Turn on delivery confirmations 12channel.confirm_delivery() 13 14exchange_name = "test_exchange" 15routing_key = "test_routing_key" 16 17if __name__ == "__main__": 18 print("Start of producer") 19 # Send a message 20 try: 21 channel.basic_publish( 22 exchange=exchange_name, 23 routing_key=routing_key, 24 body="Hello World!", 25 properties=pika.BasicProperties( 26 content_type="text/plain", 27 delivery_mode=pika.DeliveryMode.Transient, 28 ), 29 ) 30 print("Message publish was confirmed") 31 except pika.exceptions.UnroutableError: 32 print("Message could not be confirmed")
Сначала поднимем consumer
, чтобы создать exchange
и queue
:
python consumer.py
Start of consumer
Если consumer
поднялся без ошибок, теперь отправим сообщение в очередь:
python producer.py
Start of producer
Message publish was confirmed
В консоли consumer
'a видим:
1
b'Hello World!'
Вроде все получилось, теперь можно остановить и удалить контейнер с rabbitmq
:
sudo docker ps -a
sudo docker stop <cont_id>
sudo docker container rm <cont_id>
sudo docker container prune