개인 저장용 블로그

RabbitMQ Tutorials - Routing (6) 본문

오픈소스/RabbitMQ

RabbitMQ Tutorials - Routing (6)

우엉잇 2022. 4. 4. 13:20
이번 목차는 Routing 에 대한 내용입니다. 

 

 이전 소단원에서는 pub/sub 이용하여 단순한 메시지를 모든 수신자에게 전달 하였습니다.

해당 소단원에서는 Routing 기능을 통해 특정 수신자에게 특정 메시지를 보내는 테스트를 예정
입니다.

 

예를 들어, 앞선 예저의 로깅 시스템에서 Disk Logging 프로그램은 Disk 용량 절약을 목적으로 Critical 메세지만 수신하여 파일로 떨구고 싶은 경우에 Routing 기능을 사용 있습니다.

- Bindings

이전 소단원에서 아래와 같은 바인드 코드를 사용했습니다.  

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name)


바인딩은 매개 변수로 route_key 사용 있습니다. Basic_publish 매개 변수 와의 혼동을 피하기 위해 Binding key 라고 합니다. 아래 코드는 키로 바인딩을 만드는 방법입니다. 

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black')


바인딩 키는 Exchange Type 영향을 받습니다. Type fanout 경우에 바인딩 키가 무시됩니다.
따라서 아래 예제는 Exchange 생성 , 타입을 direct 설정합니다.

 

- Direct exchange

 

아래 그림과 같이 Direct 타입의 exchange 사용하여 로깅 시스템을 구현 예정입니다.

 

동작 방식 (출처 : RabbitMQ 홈페이지)

 

- Emitting logs

 

Direct Type exchange 사용하기 위해서는 아래와 같은 설정이 필요합니다.

로그 severity routing_key 설정합니다.

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')


publish
아래와 같이 routing_key 값을 추가해서 해당 키워드 메시지를 전송합니다.

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)


우리가 설정 키워드는 serverity이고 배열 값입니다.

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'


번째 매개변수로 들어오는 값은 로그레벨이며, null 경우 해당 값은 “info” 설정됩니다.

 

번째 매개변수로 들어오는 값은 메시지 내용이며, null 경우 Hello World! 찍습니다.

 

1.2.4.1.  Python

 

- Python 소스 코드 작성


emit_log_direct.py

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
 
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
    exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()


recive_logs_direct.py

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
 
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
 
severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)
 
for severity in severities:
    channel.queue_bind(
        exchange='direct_logs', queue=queue_name, routing_key=severity)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
 
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
 
 
channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)
 
channel.start_consuming()

위의 내용을 간략히 설명하면 warning으로 로그레벨이 넘어오면 Q1으로, error으로 로그레벨이 넘어오면 Q2 메세징을 큐잉합니다. 

- Message Queue테스트

 

테스트 케이스1) 특정 로그레벨(error, warning) 들어올 리다이렉트를 이용해 로그 저장

 

# 로그 receive 실행

mwno@mwno-VirtualBox:/product/python$ python3 -u receive_logs_direct.py warning error > logs_from_rabbit.log

 

# 4개의 메시지를 전송 (error 1, info 2, warning 1)

mwno@mwno-VirtualBox:/product/python$ python3 emit_log_direct.py error "Run. Run. Error Message Detected."
 [x] Sent 'error':'Run. Run. Error Message Detected.'
 
mwno@mwno-VirtualBox:/product/python$ python3 emit_log_direct.py info "This is Info Message. Don't worry."
 [x] Sent 'info':"This is Info Message. Don't worry."
 
mwno@mwno-VirtualBox:/product/python$ python3 emit_log_direct.py warning "Run. Run. Or it will explode."
 [x] Sent 'warning':'Run. Run. Or it will explode.'
 
mwno@mwno-VirtualBox:/product/python$ python3 emit_log_direct.py info "This is Info Message. Don't worry."
 [x] Sent 'info':"This is Info Message. Don't worry."

 

# logs_from_rabbit.log 확인

mwno@mwno-VirtualBox:/product/python$ cat logs_from_rabbit.log
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'error':b'Run. Run. Error Message Detected.'
 [x] 'warning':b'Run. Run. Or it will explode.'

* Error Warning 메시지만 로그에 기록된 것을 확인 있다.

 

----------------------------------------------------------------------------------------------------------------------------------------------
테스트 케이스2) 특정 로그레벨(error, warning) 들어올 화면에 찍는

 

# 로그 receive 실행

mwno@mwno-VirtualBox:/product/python$ python3 receive_logs_direct.py warning error
 [*] Waiting for logs. To exit press CTRL+C

 

# 메시지 전송

mwno@mwno-VirtualBox:/product/python$ python3 emit_log_direct.py error "Run. Run. Error Message Detected."
 [x] Sent 'error':'Run. Run. Error Message Detected.'
 
mwno@mwno-VirtualBox:/product/python$ python3 emit_log_direct.py info "This is Info Message. Don't worry."
 [x] Sent 'info':"This is Info Message. Don't worry."
 
mwno@mwno-VirtualBox:/product/python$ python3 emit_log_direct.py warning "Run. Run. Or it will explode."
 [x] Sent 'warning':'Run. Run. Or it will explode.'
 
mwno@mwno-VirtualBox:/product/python$ python3 emit_log_direct.py info "This is Info Message. Don't worry."
 [x] Sent 'info':"This is Info Message. Don't worry."

# 로그 receive 확인

mwno@mwno-VirtualBox:/product/python$ python3 receive_logs_direct.py warning error
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'error':b'Run. Run. Error Message Detected.'
 [x] 'warning':b'Run. Run. Or it will explode.'

* Error  Warning 메시지만 로그에 기록된 것을 확인   있다.

 

 

 

'오픈소스 > RabbitMQ' 카테고리의 다른 글

RabbitMQ Tutorials - RPC (8)  (0) 2022.04.04
RabbitMQ Tutorials - Topics (7)  (0) 2022.04.04
RabbitMQ Tutorials - Publish/Subscribe (5)  (0) 2022.04.04
RabbitMQ Tutorials - Work queues (4)  (0) 2022.04.04
RabbitMQ Tutorials - Hello Wolrd (3)  (0) 2022.04.04