이번 목차는 work queus에 대한 내용입니다.
시간을 많이 걸리는 작업을 여러 작업자에게 분산하는데 사용 할 작업 대기열을 만들고 테스트 하는 내용입니다. 작업 대기열의 기본 아이디는 리소스 집약적인 작업을 즉시 수행하고 완료될 때까지 기다려야 하는 것을 방지하는 것입니다. 대신 나중에 수행 할 작업을 예약합니다.
1.2.2.1. Python
- Python 소스 코드 작성
new_task.py
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" # 메세지는 arg로 받거나 없으면 Hello World를 보낸다. channel.basic_publish( exchange='', routing_key='task_queue', # 여기서 routing queue는 보낼 queue를 이름을 지정한다. body=message, # body에 메세지를 담아 보낸다. properties=pika.BasicProperties( delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE )) print(" [x] Sent %r" % message) connection.close() |
worker.py
#!/usr/bin/env python import pika import time connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body.decode()) time.sleep(body.count(b'.')) # body에 .의 갯수만큼 sleep을 준다 print(" [x] Done") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='task_queue', on_message_callback=callback) channel.start_consuming() |
- Message Queue테스트
# shell 1, worker 실행 (종료 X)
root@master:/product/python# python3 worker.py [*] Waiting for messages. To exit press CTRL+C . |
# shell 2, worker 실행 (종료 X)
root@master:/product/python# python3 worker.py [*] Waiting for messages. To exit press CTRL+C . |
# new_task 실행
root@master:/product/python# python3 new_task.py [x] Sent 'Hello World!' |
# shell 1 , woker 보기
root@master:/product/python# python3 worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Hello World!' [x] Done |
# new_task 여러 번 보내기
root@master:/product/python# python3 new_task.py "First message." [x] Sent 'First message.' root@master:/product/python# python3 new_task.py "Second message.." [x] Sent 'Second message..' root@master:/product/python# python3 new_task.py "Third message..." [x] Sent 'Third message...' root@master:/product/python# python3 new_task.py "Fourth message...." [x] Sent 'Fourth message....' root@master:/product/python# python3 new_task.py "Fifth message....." [x] Sent 'Fifth message.....' |
# shell 1, woker 보기
root@master:/product/python# python3 worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Hello World!' [x] Done [x] Received 'Second message..' [x] Done [x] Received 'Fourth message....' [x] Done |
# shell 2, woker 보기
root@master:/product/python# python3 worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Done [x] Received 'Third message...' [x] Done [x] Received 'Fifth message.....' [x] Done |
기본적으로 RabbitMQ는 각 메시지를 라운드 로빈 방식으로 전송 하는 것을 확인 할 수 있었습니다.
- RqbbitMQ Console 확인
‘queue’ 태그를 확인 해 보면 overview 에 task_queue라는 queue를 확인 할 수 있다.
task_queue를 누르면 현재 들어온 queued messages를 확인 할 수있습니다.
위의 그림처럼 queue가 들어오는 것을 확인 할 수 있습니다.
- Message 승인 테스트
기본적으로 queue의 동작 방식은 Consumer에게 메시지가 전달하면 즉시 메모리로부터 그 메시지를 제거 합니다. 만약 오래 걸리는 작업에서 sender가 메시지를 부분적으로 전달하고 죽게되면 해당 메시지를 잃어 버리게 됩니다.
이러한 내용을 방지하기 위해 RabbitMQ에서는 Message Acknowledgement를 제공합니다. Conusmer는 RabbitMQ에게 ack를 되돌려 주는데 이는 특정 메시지가 처리되었음을 알리고 그 알림을 받은 메시지에 대하서 RabbitMQ가 지울 수 있도록 해줍니다. 만약 Conusmer가 ack 전송없이 죽게된다면 RabiitMQ는 그 메시지가 완전하게 처리되지 않았다고 판단하여 다시 큐잉 합니다. 만약 다른 Consumer가 존재한다면 그 메시지를 다른 Consumer에게 재 전송 합니다. (Message TimeOut X) 이러한 기능은 메시지 처리가 오래 걸리는 업무에서 유용한 기능입니다.
Consumer delivery ack에는 시간 초과(30분)가 적용 됩니다. 이는 메시지 delivery를 확인하지 않은 버그가 있는 Consumer를 감지하는데 도움이 됩니다. 기본적으로 수동 메세지 승인(Manual Message Acknowledgements) 활성화 되어 있습니다.
- Message Queue테스트
# Long Time message 보내기
root@master:/product/python# python3 new_task.py "First message." [x] Sent 'First message.' root@master:/product/python# python3 new_task.py "Second message.." [x] Sent 'Second message..' root@master:/product/python# python3 new_task.py "Long Time message........................" [x] Sent 'Long Time message........................' |
위와 같이 메시지 전송 시 shell1의 worker에서 First message를 받고, shell2의 worker에서 Second message를 받습니다. 이후 Long Time Message는 다시 shell1의 worker에서 받는데 해당 message를 받는 도중에 worker를 kill 하면 shell1의 worker에서는 아래와 같이 출력 됩니다.
# shell1, worker
root@master:/product/python# python3 worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Done [x] Received 'Long Time message........................' # worker kill 하기 ^CTraceback (most recent call last): File "/product/python/worker.py", line 23, in <module> channel.start_consuming() File "/usr/local/lib/python3.9/dist-packages/pika/adapters/blocking_connection.py", line 1865, in start_consuming self._process_data_events(time_limit=None) File "/usr/local/lib/python3.9/dist-packages/pika/adapters/blocking_connection.py", line 2026, in _process_data_events self.connection.process_data_events(time_limit=time_limit) File "/usr/local/lib/python3.9/dist-packages/pika/adapters/blocking_connection.py", line 833, in process_data_events self._dispatch_channel_events() File "/usr/local/lib/python3.9/dist-packages/pika/adapters/blocking_connection.py", line 567, in _dispatch_channel_events impl_channel._get_cookie()._dispatch_events() File "/usr/local/lib/python3.9/dist-packages/pika/adapters/blocking_connection.py", line 1492, in _dispatch_events consumer_info.on_message_callback(self, evt.method, File "/product/python/worker.py", line 15, in callback time.sleep(body.count(b'.')) # body에 .의 갯수만큼 sleep을 준다 KeyboardInterrupt |
Message Acknowledgement 기능으로 인해 shell2의 worker가 존재하여 해당 worker에서 Long Time message를 처리하게 됩니다.
# shell2, worker
root@master:/product/python# python3 worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Done [x] Received 'Long Time message........................' [x] Done |
- Message durability
RabbitMQ Server가 중지(충돌 포함)되면 Task은 손실되게 됩니다. 메시지가 손실되지 않도록 하기 위해서는 두가지의 설정 구성이 필요합니다.
첫번째 구성, RabiitMQ Server를 다시 시작해도 Queue가 살아남을 수 있도록 설정이 필요합니다. Produser와 Consumer 코드 두 곳다 설정 되어 있어야 합니다.
channel.queue_declare(queue='task_queue', durable=True) |
두번째 구성, pika.spec.PERSISTENT_DELIVERY_MODE 값으로 delivery_mode 속성을 제공하여 메시지를 지속성으로 표시해야 합니다.
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE )) |
* 메시지를 지속적으로 표시한다고 해서 메시지가 손실되지 않는다는 보장은 없습니다.
- Fair dispatch
기본적으로 라운드 로빈 방식으로 동작을 하기 되는데, 메시지를 보내는 순서에 따라 1번 worker에 만 작업 시간이 긴 메시지가 몰리고 2번 worker에는 간단한 메시지만 보내질 수 있습니다. RabbitM
Q에서는 해당 내용에 대해 알지 못합니다. 이를 방지하기 위해 prefetch_count=1 설정과 함께 Channel#basic_qos 채널 방법을 사용 할 수 있습니다. basic.qos 프로토콜 방법을 사용하여 RabbitMQ가 작업자에게 한 번에 둘 이상의 메시지를 제공하지 않도록 지시합니다.
예시)
아래와 같이 Message를 발송한다.
1번 메시지 (40초)
2번 메시지 (2초)
3번 메시지 (3초)
4번 메시지 (3초)
기존 방식이라면 1번 worker에 1,3 번 메시지가, 2번 worker에 2,4번 메시지가 전달 되지만 위의 옵션을 사용 시 1번 worker에서 1번 메시지를 처리하는 동안 2번 worker에서 2,3,4 메시지를 처리하게 됩니다.
'오픈소스 > RabbitMQ' 카테고리의 다른 글
RabbitMQ Tutorials - Routing (6) (0) | 2022.04.04 |
---|---|
RabbitMQ Tutorials - Publish/Subscribe (5) (0) | 2022.04.04 |
RabbitMQ Tutorials - Hello Wolrd (3) (0) | 2022.04.04 |
RabbitMQ Tutorials - Python 환경구성 (2) (0) | 2022.04.04 |
RabbitMQ Tutorials - Downloading and install RabbitMQ (1) (0) | 2022.04.04 |