RabbitMQ Tutorials - Publish/Subscribe (5)
이번 목차는 Publish/Subscribe 에 대한 내용입니다.
이전 소단원에서는 작업 대기열을 만든 후 한개의 Task가 한개의 worker에게 메시지를 전달 하였습니다. 해당 소단원에서는 한개의 Task가 여러 woker에게 메시지를 전달하는 테스트를 진행 할 예정입니다. 이 패턴을 "publish/subscribe" 라고 합니다.
Pub / Sub 패턴을 설명하기 위해 간단한 로깅 시스템을 구축 할 예정입니다. 두개의 프로그램으로 구성되며, 첫 번쨰 프로그램은 로그 메시지를 보내고 두번째 프로그램은 이를 수신하여 인쇄합니다. 로깅 시스템에서 수신기 프로그램의 모든 실행 복사본은 메시지를 받습니다. 그렇게 하면 하나의 수신기를 실행하고 로그를 디스크로 보낼 수 있습니다. 동시에 다른 수신기를 실행하고 화면에서 로그를 볼 수 있습니다.
- Exchange
이전 튜토리얼에서는 메시지를 큐에서 보내고 큐에서 꺼내 사용하는 내용이였습니다. 아래 내용부터는 Rabbit에서 전체 메세징 모델을 소개합니다.
Producer(생산자)는 메시지를 전송하는 사용자 응용 프로그램입니다.
Queue(큐)는 메시지를 저장하는 버퍼입니다.
Consumer(소비자)는 메시지를 받는 사용자 응용 프로그램입니다.
RabbitMQ 메시징 모델의 핵심은 Producer가 Queue에 직접 메시지를 보내지 않는다는 것입니다. Producert는 메시지가 Queue에 Delivery 되는지도 확인 할 수 없습니다.
대신 아래 그림과 같이 Producer는 X(Exchange)에만 메시지를 보낼 수 있습니다.
X(Exchange)는 Producer로부터 메시지를 수신하고 대기열로 메시지를 Push 합니다.
Exchange Type로는 direct, topic, headers, fanout 이 있습니다. 그 중 fanout에 대해 예제를 하려고 합니다.
Fanout 교환은 수신한 모든 메시지를 알고 있는 모든 대기열로 브로드캐스트를 합니다.
Exchange는 아래와 같이 선언 할 수 있습니다.
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') |
Exchange=”는 교환 이름을 설정하는 것이고 “은 기본 설정을 의미합니다. Exchange 값이 입력되지 않은 경우 기본 설정을 사용하게 되고 메시지는 routin_key 값의 이름을 갖는 Queue에 전달 됩니다.
아래와 같이 이름을 명시한 교환 메시지를 push 할 예정입니다.
channel.basic_publish(exchange='logs', routing_key='', body=message) |
- Temporary Queues
Pub / Sub 방식으로 동작할 때 여러 개의 Suberscirber를 동작시키려면 여러 개의 queue가 필요합니다. 위의 예시에서 disk logging 프로그램과 screen logging 프로그램 각각 quque를 사용해야 하는데 각각의 queue 이름을 queue-disk-loggin,queue-screen-logging 와 같이 정할 수 있지만 새로운 프로그램이 추가될 때마다 queue의 이름을 계속 짓는 것은 불펼 할 수 있습니다.
Temporary Queues 기능을 사용하면 Queue의 이름을 랜덤으로 지을 수 있습니다. 또한, exclusive=True 옵션을 사용해서 Subscriber가 종료되면 해당 Queue를 자동으로 없어지도록 할 수 있습니다.
result = channel.queue_declare(queue='', exclusive=True) |
위와 같이 설정하면 랜덤으로 지어진 queue 이름은 result.method.queueu로 리턴 됩니다.
또한 exclusive=True 옵션에 의해 Subscriber가 종료되면 해당 queue를 자동으로 지워지게 됩니다.
- Temporary Queues
Pub / Sub 방식으로 동작할 때 여러 개의 Suberscirber를 동작시키려면 여러 개의 queue가 필요합니다. 위의 예시에서 disk logging 프로그램과 screen logging 프로그램 각각 quque를 사용해야 하는데 각각의 queue 이름을 queue-disk-loggin,queue-screen-logging 와 같이 정할 수 있지만 새로운 프로그램이 추가될 때마다 queue의 이름을 계속 짓는 것은 불펼 할 수 있습니다.
Temporary Queues 기능을 사용하면 Queue의 이름을 랜덤으로 지을 수 있습니다. 또한, exclusive=True 옵션을 사용해서 Subscriber가 종료되면 해당 Queue를 자동으로 없어지도록 할 수 있습니다.
result = channel.queue_declare(queue='', exclusive=True) |
위와 같이 설정하면 랜덤으로 지어진 queue 이름은 result.method.queueu로 리턴 됩니다.
또한 exclusive=True 옵션에 의해 Subscriber가 종료되면 해당 queue를 자동으로 지워지게 됩니다.
- Bindings
Exchange와 queue를 연결하는 작업입니다. 아래와 같이 코딩하면 logs라는 이름의 exchange와 랜덤으로 지어진 queue를 연결합니다.
1.2.3.1. Python
- Python 소스 코드 작성
emit_log.py (Pusblish)
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close() |
receive_logs.py (Subscribe)
#!/usr/bin/env python import pika connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming() |
- Message Queue테스트
# 로그를 파일에 저장하기 (shell 1)
mwno@mwno-VirtualBox:/product/python$ python3 receive_logs.py > logs_from_rabbit.log |
* buffer 문제로 로그가 안쌓일 수 있다. 그럴경우 python3 –u receive_logs.py > logs_from_rabbit.log
# 로그 출력을 위한 py 실행 (shell 2)
mwno@mwno-VirtualBox:/product/python$ python3 receive_logs.py [*] Waiting for logs. To exit press CTRL+C |
# 로그를 보낸다 (shell 3)
mwno@mwno-VirtualBox:/product/python$ python3 emit_log.py [x] Sent 'info: Hello World!' |
# rabiitmqctl list_bindings을 사용하여 코드가 실제로 바인딩과 대기열을 원하는대로 생성하였는지 체크
root@f1f30095da0f:/# rabbitmqctl list_bindings Listing bindings for vhost /... source_name source_kind destination_name destination_kind routing_key arguments exchange amq.gen-8IR3LgXd6x_gL7kR4h9wHQ queue amq.gen-8IR3LgXd6x_gL7kR4h9wHQ [] exchange amq.gen-Sjvop-c0Qd-KlASl_mVOjg queue amq.gen-Sjvop-c0Qd-KlASl_mVOjg [] logs exchange amq.gen-8IR3LgXd6x_gL7kR4h9wHQ queue amq.gen-8IR3LgXd6x_gL7kR4h9wHQ [] logs exchange amq.gen-Sjvop-c0Qd-KlASl_mVOjg queue amq.gen-Sjvop-c0Qd-KlASl_mVOjg [] |
* 위의 명령어를 실행하기 도커 이미지에 접속 해야 합니다.
# 도커 프로세스 조회 mwno@mwno-VirtualBox:/product/python$ sudo docker ps -a [sudo] password for mwno: CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES f1f30095da0f rabbitmq:3.9-management "docker-entrypoint.s…" About an hour ago Up About an hour 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp rabbitmq mwno@mwno-VirtualBox:/product/python$ # exec 로 접속하기 mwno@mwno-VirtualBox:~# sudo docker exec -it f1f30095da0f /bin/bash # 접속 완료 root@f1f30095da0f:/# |
# 결과 화면
# 1 로그 파일 확인 (logs_from_rabbit.log)
mwno@mwno-VirtualBox:/product/python$ cat logs_from_rabbit.log [*] Waiting for logs. To exit press CTRL+C [x] b'info: Hello World!' |
# 2 Screen에서 찍히는 메시지 확인
mwno@mwno-VirtualBox:/product/python$ python3 receive_logs.py [*] Waiting for logs. To exit press CTRL+C [x] b'info: Hello World!' |