이번 목차는 Routing 에 대한 내용입니다.
이전 소단원에서는 pub/sub를 이용하여 단순한 메시지를 모든 수신자에게 전달 하였습니다.
해당 소단원에서는 Routing 기능을 통해 특정 수신자에게 특정 메시지를 보내는 테스트를 할 예정
입니다.
예를 들어, 앞선 예저의 로깅 시스템에서 Disk Logging 프로그램은 Disk의 용량 절약을 목적으로 Critical한 메세지만 수신하여 파일로 떨구고 싶은 경우에 Routing 기능을 사용 할 수 있습니다.
- Bindings
이전 소단원에서 아래와 같은 바인드 코드를 사용했습니다.
channel.queue_bind(exchange=exchange_name, queue=queue_name) |
바인딩은 매개 변수로 route_key를 사용 할 수 있습니다. Basic_publish 매개 변수 와의 혼동을 피하기 위해 Binding key 라고 합니다. 아래 코드는 키로 바인딩을 만드는 방법입니다.
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='black') |
바인딩 키는 Exchange Type의 영향을 받습니다. Type이 fanout 인 경우에 바인딩 키가 무시됩니다.
따라서 아래 예제는 Exchange를 생성 할 때, 타입을 direct로 설정합니다.
- Direct exchange
아래 그림과 같이 Direct 타입의 exchange를 사용하여 로깅 시스템을 구현 할 예정입니다.
- Emitting logs
Direct Type의 exchange를 사용하기 위해서는 아래와 같은 설정이 필요합니다.
로그 severity를 routing_key로 설정합니다.
channel.exchange_declare(exchange='direct_logs', exchange_type='direct') |
publish 할 때 아래와 같이 routing_key에 값을 추가해서 해당 키워드 일 때 메시지를 전송합니다.
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) |
우리가 설정 할 키워드는 serverity이고 배열 값입니다.
severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' |
첫 번째 매개변수로 들어오는 값은 로그레벨이며, null 인 경우 해당 값은 “info”로 설정됩니다.
두 번째 매개변수로 들어오는 값은 메시지 내용이며, null 인 경우 Hello World! 를 찍습니다.
1.2.4.1. Python
- Python 소스 코드 작성
emit_log_direct.py
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish( exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close() |
recive_logs_direct.py
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind( exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming() |
위의 내용을 간략히 설명하면 warning으로 로그레벨이 넘어오면 Q1으로, error으로 로그레벨이 넘어오면 Q2로 메세징을 큐잉합니다.
- Message Queue테스트
테스트 케이스1) 특정 로그레벨(error, warning) 로 들어올 때 리다이렉트를 이용해 로그 저장
# 로그 receive 실행
mwno@mwno-VirtualBox:/product/python$ python3 -u receive_logs_direct.py warning error > logs_from_rabbit.log |
# 4개의 메시지를 전송 (error 1, info 2, warning 1)
mwno@mwno-VirtualBox:/product/python$ python3 emit_log_direct.py error "Run. Run. Error Message Detected." [x] Sent 'error':'Run. Run. Error Message Detected.' mwno@mwno-VirtualBox:/product/python$ python3 emit_log_direct.py info "This is Info Message. Don't worry." [x] Sent 'info':"This is Info Message. Don't worry." mwno@mwno-VirtualBox:/product/python$ python3 emit_log_direct.py warning "Run. Run. Or it will explode." [x] Sent 'warning':'Run. Run. Or it will explode.' mwno@mwno-VirtualBox:/product/python$ python3 emit_log_direct.py info "This is Info Message. Don't worry." [x] Sent 'info':"This is Info Message. Don't worry." |
# logs_from_rabbit.log 확인
mwno@mwno-VirtualBox:/product/python$ cat logs_from_rabbit.log [*] Waiting for logs. To exit press CTRL+C [x] 'error':b'Run. Run. Error Message Detected.' [x] 'warning':b'Run. Run. Or it will explode.' |
* Error 및 Warning 메시지만 로그에 기록된 것을 확인 할 수 있다.
----------------------------------------------------------------------------------------------------------------------------------------------
테스트 케이스2) 특정 로그레벨(error, warning)로 들어올 떄 화면에 찍는 법
# 로그 receive 실행
mwno@mwno-VirtualBox:/product/python$ python3 receive_logs_direct.py warning error [*] Waiting for logs. To exit press CTRL+C |
# 메시지 전송
mwno@mwno-VirtualBox:/product/python$ python3 emit_log_direct.py error "Run. Run. Error Message Detected." [x] Sent 'error':'Run. Run. Error Message Detected.' mwno@mwno-VirtualBox:/product/python$ python3 emit_log_direct.py info "This is Info Message. Don't worry." [x] Sent 'info':"This is Info Message. Don't worry." mwno@mwno-VirtualBox:/product/python$ python3 emit_log_direct.py warning "Run. Run. Or it will explode." [x] Sent 'warning':'Run. Run. Or it will explode.' mwno@mwno-VirtualBox:/product/python$ python3 emit_log_direct.py info "This is Info Message. Don't worry." [x] Sent 'info':"This is Info Message. Don't worry." |
# 로그 receive 확인
mwno@mwno-VirtualBox:/product/python$ python3 receive_logs_direct.py warning error [*] Waiting for logs. To exit press CTRL+C [x] 'error':b'Run. Run. Error Message Detected.' [x] 'warning':b'Run. Run. Or it will explode.' |
* Error 및 Warning 메시지만 로그에 기록된 것을 확인 할 수 있다.
'오픈소스 > RabbitMQ' 카테고리의 다른 글
RabbitMQ Tutorials - RPC (8) (0) | 2022.04.04 |
---|---|
RabbitMQ Tutorials - Topics (7) (0) | 2022.04.04 |
RabbitMQ Tutorials - Publish/Subscribe (5) (0) | 2022.04.04 |
RabbitMQ Tutorials - Work queues (4) (0) | 2022.04.04 |
RabbitMQ Tutorials - Hello Wolrd (3) (0) | 2022.04.04 |