개인 저장용 블로그

RabbitMQ Tutorials - Work queues (4) 본문

오픈소스/RabbitMQ

RabbitMQ Tutorials - Work queues (4)

우엉잇 2022. 4. 4. 10:23
이번 목차는 work queus에 대한 내용입니다. 

 시간을 많이 걸리는 작업을 여러 작업자에게 분산하는데 사용 작업 대기열을 만들고 테스트 하는 내용입니다. 작업 대기열의 기본 아이디는 리소스 집약적인 작업을 즉시 수행하고 완료될 때까지 기다려야 하는 것을 방지하는 것입니다. 대신 나중에 수행 작업을 예약합니다.

 

동작 방식 (출처 : RabbitMQ 홈페이지)

 

1.2.2.1.  Python

 

- Python 소스 코드 작성

 

new_task.py

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
 
message = ' '.join(sys.argv[1:]) or "Hello World!"  # 메세지는 arg 받거나 없으면 Hello World 보낸다.
channel.basic_publish(
    exchange='',
    routing_key='task_queue', # 여기서 routing queue 보낼 queue 이름을 지정한다.
    body=message,              # body 메세지를 담아 보낸다.
    properties=pika.BasicProperties(
        delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
    ))
print(" [x] Sent %r" % message)
connection.close()

 

worker.py

#!/usr/bin/env python
import pika
import time
 
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
 
 
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body.decode())
    time.sleep(body.count(b'.'))  # body . 갯수만큼 sleep 준다
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)
 
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
 
channel.start_consuming()

 

- Message Queue테스트

 

# shell 1, worker 실행 (종료 X)

root@master:/product/python# python3 worker.py
 [*] Waiting for messages. To exit press CTRL+C
.

# shell 2, worker 실행 (종료 X)

root@master:/product/python# python3 worker.py
 [*] Waiting for messages. To exit press CTRL+C
.

 

# new_task 실행

root@master:/product/python# python3 new_task.py
 [x] Sent 'Hello World!'

 

#  shell 1 , woker 보기

root@master:/product/python# python3 worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Hello World!'
 [x] Done

 

# new_task 여러 보내기

root@master:/product/python# python3 new_task.py "First message."
 [x] Sent 'First message.'
 
root@master:/product/python# python3 new_task.py "Second message.."
 [x] Sent 'Second message..'
 
root@master:/product/python# python3 new_task.py "Third message..."
 [x] Sent 'Third message...'
 
root@master:/product/python# python3 new_task.py "Fourth message...."
 [x] Sent 'Fourth message....'
 
root@master:/product/python# python3 new_task.py "Fifth message....."
 [x] Sent 'Fifth message.....'
 

 

# shell 1, woker 보기

root@master:/product/python# python3 worker.py
 [*] Waiting for messages. To exit press CTRL+C
 
 [x] Received 'Hello World!'
 [x] Done
 [x] Received 'Second message..'
 [x] Done
 [x] Received 'Fourth message....'
 [x] Done

# shell 2, woker 보기

root@master:/product/python# python3 worker.py
 [*] Waiting for messages. To exit press CTRL+C
 
 [x] Received 'First message.'
 [x] Done
 [x] Received 'Third message...'
 [x] Done
 [x] Received 'Fifth message.....'
 [x] Done

기본적으로 RabbitMQ 메시지를 라운드 로빈 방식으로 전송 하는 것을 확인 있었습니다.

 

-  RqbbitMQ Console 확인

 

 

‘queue’ 태그를 확인 보면 overview task_queue라는 queue 확인 있다. 

 task_queue 누르면 현재 들어온 queued messages 확인 수있습니다.

 

 

위의 그림처럼 queue가 들어오는 것을 확인 할 수 있습니다. 

 

- Message 승인 테스트

 

기본적으로 queue 동작 방식은 Consumer에게 메시지가 전달하면 즉시 메모리로부터 메시지를 제거 합니다. 만약 오래 걸리는 작업에서 sender 메시지를 부분적으로 전달하고 죽게되면 해당 메시지를 잃어 버리게 됩니다.


 
이러한 내용을 방지하기 위해 RabbitMQ에서는 Message Acknowledgement 제공합니다. Conusmer RabbitMQ에게 ack 되돌려 주는데 이는 특정 메시지가 처리되었음을 알리고 알림을 받은 메시지에 대하서  RabbitMQ 지울 있도록 해줍니다. 만약 Conusmer ack 전송없이 죽게된다면 RabiitMQ 메시지가 완전하게 처리되지 않았다고 판단하여 다시 큐잉 합니다. 만약 다른 Consumer 존재한다면 메시지를 다른 Consumer에게 전송 합니다. (Message TimeOut X) 이러한 기능은 메시지 처리가 오래 걸리는 업무에서 유용한 기능입니다.

 

 Consumer delivery ack에는 시간 초과(30) 적용 됩니다. 이는 메시지 delivery 확인하지 않은 버그가 있는 Consumer 감지하는데 도움이 됩니다. 기본적으로 수동 메세지 승인(Manual Message Acknowledgements) 활성화 되어 있습니다.

 

- Message Queue테스트

 

# Long Time message 보내기

root@master:/product/python# python3 new_task.py  "First message."
 [x] Sent 'First message.'
 
root@master:/product/python# python3 new_task.py  "Second message.."
 [x] Sent 'Second message..'
 
root@master:/product/python# python3 new_task.py  "Long Time message........................"
 [x] Sent 'Long Time message........................'

위와 같이 메시지 전송 shell1 worker에서 First message 받고, shell2 worker에서 Second message 받습니다. 이후 Long Time Message 다시 shell1 worker에서 받는데 해당 message 받는 도중에 worker kill 하면 shell1 worker에서는 아래와 같이 출력 됩니다.

 

# shell1, worker

root@master:/product/python# python3 worker.py
 [*] Waiting for messages. To exit press CTRL+C
 
 [x] Received 'First message.'
 [x] Done
 
 [x] Received 'Long Time message........................'   # worker kill 하기
^CTraceback (most recent call last):
  File "/product/python/worker.py", line 23, in <module>
    channel.start_consuming()
  File "/usr/local/lib/python3.9/dist-packages/pika/adapters/blocking_connection.py", line 1865, in start_consuming
    self._process_data_events(time_limit=None)
  File "/usr/local/lib/python3.9/dist-packages/pika/adapters/blocking_connection.py", line 2026, in _process_data_events
    self.connection.process_data_events(time_limit=time_limit)
  File "/usr/local/lib/python3.9/dist-packages/pika/adapters/blocking_connection.py", line 833, in process_data_events
    self._dispatch_channel_events()
  File "/usr/local/lib/python3.9/dist-packages/pika/adapters/blocking_connection.py", line 567, in _dispatch_channel_events
    impl_channel._get_cookie()._dispatch_events()
  File "/usr/local/lib/python3.9/dist-packages/pika/adapters/blocking_connection.py", line 1492, in _dispatch_events
    consumer_info.on_message_callback(self, evt.method,
  File "/product/python/worker.py", line 15, in callback
    time.sleep(body.count(b'.'))  # body . 갯수만큼 sleep 준다
KeyboardInterrupt

 

Message Acknowledgement 기능으로 인해 shell2 worker 존재하여 해당 worker에서 Long Time message 처리하게 됩니다.

 

# shell2, worker

root@master:/product/python# python3 worker.py
 [*] Waiting for messages. To exit press CTRL+C
 
 [x] Received 'Second message..'
 [x] Done
 
 [x] Received 'Long Time message........................'
 [x] Done

- Message durability

 

RabbitMQ Server 중지(충돌 포함)되면 Task 손실되게 됩니다. 메시지가 손실되지 않도록 하기 위해서는 두가지의 설정 구성이 필요합니다.

 

첫번째 구성,  RabiitMQ Server 다시 시작해도 Queue 살아남을 있도록 설정이 필요합니다. Produser Consumer 코드 곳다 설정 되어 있어야 합니다.

channel.queue_declare(queue='task_queue', durable=True)

 

두번째 구성, pika.spec.PERSISTENT_DELIVERY_MODE 값으로 delivery_mode 속성을 제공하여 메시지를 지속성으로 표시해야 합니다.

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE
                      ))

 

* 메시지를 지속적으로 표시한다고 해서 메시지가 손실되지 않는다는 보장은 없습니다.

 

- Fair dispatch

 

기본적으로 라운드 로빈 방식으로 동작을 하기 되는데, 메시지를 보내는 순서에 따라 1 worker 작업 시간이 메시지가 몰리고 2 worker에는 간단한 메시지만 보내질 있습니다. RabbitM
Q
에서는 해당 내용에 대해 알지 못합니다. 이를 방지하기 위해 prefetch_count=1 설정과 함께 Channel#basic_qos 채널 방법을 사용 있습니다. basic.qos 프로토콜 방법을 사용하여 RabbitMQ 작업자에게 번에 이상의 메시지를 제공하지 않도록 지시합니다.

 

예시)

아래와 같이 Message 발송한다.

 

1 메시지 (40)

2 메시지 (2)

3 메시지 (3)

4 메시지 (3)

 

기존 방식이라면 1 worker 1,3 메시지가, 2 worker 2,4 메시지가 전달 되지만 위의 옵션을 사용 1 worker에서 1 메시지를 처리하는 동안 2 worker에서 2,3,4 메시지를 처리하게 됩니다.