개인 저장용 블로그

RabbitMQ Tutorials - Publish/Subscribe (5) 본문

오픈소스/RabbitMQ

RabbitMQ Tutorials - Publish/Subscribe (5)

우엉잇 2022. 4. 4. 11:31
이번 목차는 Publish/Subscribe 에 대한 내용입니다. 

 

 이전 소단원에서는 작업 대기열을 만든 한개의 Task 한개의 worker에게 메시지를 전달 하였습니다. 해당 소단원에서는 한개의 Task 여러 woker에게 메시지를 전달하는 테스트를 진행 예정입니다. 패턴을  "publish/subscribe" 라고 합니다

 

 Pub / Sub 패턴을 설명하기 위해 간단한 로깅 시스템을 구축 예정입니다. 두개의 프로그램으로 구성되며, 번쨰 프로그램은 로그 메시지를 보내고 두번째 프로그램은 이를 수신하여 인쇄합니다. 로깅 시스템에서 수신기 프로그램의 모든 실행 복사본은 메시지를 받습니다. 그렇게 하면 하나의 수신기를 실행하고 로그를 디스크로 보낼 있습니다. 동시에 다른 수신기를 실행하고 화면에서 로그를 있습니다.

 

- Exchange

 

이전 튜토리얼에서는 메시지를 큐에서 보내고 큐에서 꺼내 사용하는 내용이였습니다. 아래 내용부터는 Rabbit에서 전체 메세징 모델을 소개합니다.

Producer(
생산자) 메시지를 전송하는 사용자 응용 프로그램입니다.

Queue() 메시지를 저장하는 버퍼입니다.

Consumer(소비자) 메시지를 받는 사용자 응용 프로그램입니다.

 

RabbitMQ 메시징 모델의 핵심은 Producer Queue 직접 메시지를 보내지 않는다는 것입니다. Producert 메시지가 Queue Delivery 되는지도 확인 없습니다.

대신 아래 그림과 같이 Producer X(Exchange)에만 메시지를 보낼 있습니다.

X(Exchange) Producer로부터 메시지를 수신하고 대기열로 메시지를 Push 합니다.

 

동작방식 (출처 : RabbitMQ 홈페이지)

Exchange Type로는 direct, topic, headers, fanout 있습니다. fanout 대해 예제를 하려고 합니다.

 

Fanout 교환은 수신한 모든 메시지를 알고 있는 모든 대기열로 브로드캐스트를 합니다.

 

Exchange 아래와 같이 선언 있습니다.

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')

 

Exchange=” 교환 이름을 설정하는 것이고 기본 설정을 의미합니다. Exchange 값이 입력되지 않은 경우 기본 설정을 사용하게 되고 메시지는 routin_key 값의 이름을 갖는 Queue 전달 됩니다.

 

아래와 같이 이름을 명시한 교환 메시지를 push 예정입니다.

channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)

 

- Temporary Queues

 

Pub / Sub 방식으로 동작할 여러 개의 Suberscirber 동작시키려면 여러 개의 queue 필요합니다. 위의 예시에서 disk logging 프로그램과 screen logging 프로그램 각각 quque 사용해야 하는데 각각의 queue 이름을 queue-disk-loggin,queue-screen-logging 같이 정할 있지만 새로운 프로그램이 추가될 때마다 queue 이름을 계속 짓는 것은 불펼 있습니다.

 

Temporary Queues 기능을 사용하면 Queue 이름을 랜덤으로 지을 있습니다. 또한, exclusive=True 옵션을 사용해서 Subscriber 종료되면 해당 Queue 자동으로 없어지도록 있습니다.

result = channel.queue_declare(queue='', exclusive=True)


위와 같이 설정하면 랜덤으로 지어진 queue 이름은 result.method.queueu 리턴 됩니다.

또한 exclusive=True 옵션에 의해 Subscriber 종료되면 해당 queue 자동으로 지워지게 됩니다.

 

- Temporary Queues

 

Pub / Sub 방식으로 동작할 여러 개의 Suberscirber 동작시키려면 여러 개의 queue 필요합니다. 위의 예시에서 disk logging 프로그램과 screen logging 프로그램 각각 quque 사용해야 하는데 각각의 queue 이름을 queue-disk-loggin,queue-screen-logging 같이 정할 있지만 새로운 프로그램이 추가될 때마다 queue 이름을 계속 짓는 것은 불펼 있습니다.

 

Temporary Queues 기능을 사용하면 Queue 이름을 랜덤으로 지을 있습니다. 또한, exclusive=True 옵션을 사용해서 Subscriber 종료되면 해당 Queue 자동으로 없어지도록 있습니다.

result = channel.queue_declare(queue='', exclusive=True)


위와 같이 설정하면 랜덤으로 지어진 queue 이름은 result.method.queueu 리턴 됩니다.

또한 exclusive=True 옵션에 의해 Subscriber 종료되면 해당 queue 자동으로 지워지게 됩니다.

 

- Bindings

 

Exchange queue 연결하는 작업입니다. 아래와 같이 코딩하면 logs라는 이름의 exchange 랜덤으로 지어진 queue 연결합니다.

 

동작방식 (출처 : RabbitMQ 홈페이지)

 

1.2.3.1.  Python

 

- Python 소스 코드 작성

 

emit_log.py (Pusblish)

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='logs', exchange_type='fanout')
 
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()

 

receive_logs.py (Subscribe)

#!/usr/bin/env python
import pika
 
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='logs', exchange_type='fanout')
 
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
 
channel.queue_bind(exchange='logs', queue=queue_name)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
    print(" [x] %r" % body)
 
channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)
 
channel.start_consuming()

 

- Message Queue테스트

 

# 로그를 파일에 저장하기 (shell 1)

mwno@mwno-VirtualBox:/product/python$ python3 receive_logs.py > logs_from_rabbit.log

 

* buffer 문제로 로그가 안쌓일 있다. 그럴경우 python3 –u receive_logs.py > logs_from_rabbit.log

 

# 로그 출력을 위한 py 실행 (shell 2)

mwno@mwno-VirtualBox:/product/python$ python3 receive_logs.py
 [*] Waiting for logs. To exit press CTRL+C

 

# 로그를 보낸다 (shell 3)

mwno@mwno-VirtualBox:/product/python$ python3 emit_log.py
 [x] Sent 'info: Hello World!'

 

# rabiitmqctl list_bindings 사용하여 코드가 실제로 바인딩과 대기열을 원하는대로 생성하였는지 체크

root@f1f30095da0f:/# rabbitmqctl list_bindings
Listing bindings for vhost /...
source_name     source_kind     destination_name        destination_kind        routing_key     arguments
        exchange        amq.gen-8IR3LgXd6x_gL7kR4h9wHQ  queue   amq.gen-8IR3LgXd6x_gL7kR4h9wHQ  []
        exchange        amq.gen-Sjvop-c0Qd-KlASl_mVOjg  queue   amq.gen-Sjvop-c0Qd-KlASl_mVOjg  []
logs    exchange        amq.gen-8IR3LgXd6x_gL7kR4h9wHQ  queue   amq.gen-8IR3LgXd6x_gL7kR4h9wHQ  []
logs    exchange        amq.gen-Sjvop-c0Qd-KlASl_mVOjg  queue   amq.gen-Sjvop-c0Qd-KlASl_mVOjg  []

 

 

* 위의 명령어를 실행하기 도커 이미지에 접속 해야 합니다.

# 도커 프로세스 조회
mwno@mwno-VirtualBox:/product/python$ sudo docker ps -a
[sudo] password for mwno:
CONTAINER ID   IMAGE                     COMMAND                  CREATED             STATUS             PORTS                                                                                                                                                 NAMES
f1f30095da0f   rabbitmq:3.9-management   "docker-entrypoint.s…"   About an hour ago   Up About an hour   4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp   rabbitmq
mwno@mwno-VirtualBox:/product/python$
 
# exec 접속하기
mwno@mwno-VirtualBox:~# sudo docker exec -it f1f30095da0f /bin/bash
 
# 접속 완료
root@f1f30095da0f:/#

# 결과 화면

# 1 로그 파일 확인 (logs_from_rabbit.log)

mwno@mwno-VirtualBox:/product/python$ cat logs_from_rabbit.log
 [*] Waiting for logs. To exit press CTRL+C
 [x] b'info: Hello World!'

# 2 Screen에서 찍히는 메시지 확인

mwno@mwno-VirtualBox:/product/python$ python3 receive_logs.py
 [*] Waiting for logs. To exit press CTRL+C
 [x] b'info: Hello World!'