Пробую rabbitmq
В этом посте хотел попробовать такую технологию, как rabbitmq
, это брокер сообщений, который работает на протоколе AMQP
и служит для передачи сообщений между, например, разными микросервисами, или между разными частями программы.
Иногда при разработке возникает проблема передачи данных или сигналов между разными сервисами, или, например, вы хотите отправить сигнал какому-то "воркеру", чтобы он выполнил какой-то скрипт.
Эти проблемы можно решить с помощью обычных HTTP
запросов, но иногда этот вариант по разным причинам не подходит. И здесь можно посмотреть в сторону такой технологии, как rabbitmq
. В ней есть несколько сущностей, которые я затрону в этом посте: так называемые exchange
, queue
и routing_key
.
Exchage
- это некая абстракция, к которой могут относиться несколько разных очередей - queue
. И есть так называемые routing_key
, которые нужны, чтобы Exchage
"понял" в какую очередь отправить сообщение.
Нам понадобится 2 питоновских файла - producer.py
и consumer.py
. Первый будет слать сообщения в очередь, а второй - читать их. Но сначала, нужно поставить и поднять собственно сам rabbitmq
локально, удобнее всего, мне кажется, сделать это с помощью docker
:
sudo docker run -d -p 5672:5672 rabbitmq
В этой команде флаг -d
означает запуск в фоновом режиме, флаг -p
отвечает за "проброс" порта, а rabbitmq
- это название нужного нам контейнера. После этого, если все нормально, контейнер поднимется и можно проверить, что все ок:
sudo docker ps -a
Теперь, у нас есть rabbitmq
локально, и мы можем начать писать код, сначала, поставим библиотеку pika, она нам понадобится для взаимодействия с нашим локально поднятым кроликом:
pip install pika
Создадим папку и файлы:
mkdir rabbit_test cd rabbit_test touch consumer.py producer.py
Теперь, напишем наш consumer
в файле consumer.py
:
1 import pika 2 3 4 def on_message(channel, method_frame, header_frame, body): 5 print(method_frame.delivery_tag) 6 print(body) 7 channel.basic_ack(delivery_tag=method_frame.delivery_tag) 8 9 10 parameters = pika.connection.URLParameters("amqp://guest:guest@localhost:5672/%2F") 11 connection = pika.BlockingConnection(parameters) 12 13 # Open the channel 14 channel = connection.channel() 15 16 # Declare the exchange 17 exchange_name = "test_exchange" 18 channel.exchange_declare(exchange_name) 19 20 # Declare the queue 21 queue_name = "test_queue" 22 channel.queue_declare( 23 queue=queue_name, durable=True, exclusive=False, auto_delete=False 24 ) 25 26 # Bind of queue to exchange 27 routing_key = "test_routing_key" 28 channel.queue_bind(queue_name, exchange_name, routing_key) 29 30 channel.basic_consume(queue_name, on_message) 31 32 33 if __name__ == "__main__": 34 print("Start of consumer") 35 try: 36 channel.start_consuming() 37 except KeyboardInterrupt: 38 channel.stop_consuming() 39 connection.close()
producer.py
:
1 import pika 2 3 4 # Open a connection to RabbitMQ on localhost 5 parameters = pika.connection.URLParameters("amqp://guest:guest@localhost:5672/%2F") 6 connection = pika.BlockingConnection(parameters) 7 8 # Open the channel 9 channel = connection.channel() 10 11 # Turn on delivery confirmations 12 channel.confirm_delivery() 13 14 exchange_name = "test_exchange" 15 routing_key = "test_routing_key" 16 17 if __name__ == "__main__": 18 print("Start of producer") 19 # Send a message 20 try: 21 channel.basic_publish( 22 exchange=exchange_name, 23 routing_key=routing_key, 24 body="Hello World!", 25 properties=pika.BasicProperties( 26 content_type="text/plain", 27 delivery_mode=pika.DeliveryMode.Transient, 28 ), 29 ) 30 print("Message publish was confirmed") 31 except pika.exceptions.UnroutableError: 32 print("Message could not be confirmed")
Сначала поднимем consumer
, чтобы создать exchange
и queue
:
python consumer.py Start of consumer
Если consumer
поднялся без ошибок, теперь отправим сообщение в очередь:
python producer.py Start of producer Message publish was confirmed
В консоли consumer
'a видим:
1 b'Hello World!'
Вроде все получилось, теперь можно остановить и удалить контейнер с rabbitmq
:
sudo docker ps -a sudo docker stop <cont_id> sudo docker container rm <cont_id> sudo docker container prune