개인 저장용 블로그

RabbitMQ Tutorials - RPC (8) 본문

오픈소스/RabbitMQ

RabbitMQ Tutorials - RPC (8)

우엉잇 2022. 4. 4. 13:29
이번 목차는 RPC 에 대한 내용입니다. 

 

 해당 소단원을 간략히 설명하면 클라이언트는는 Request Server 전달하고 Server 해당 Request 처리하여 알맞은 결과 값을 다시 클라이언트는에 Response  해주는 방법입니다. , RPC 원격지에 메시지를 전달해서 결과를 다시 전달 받는 역할을 한다.

- Message Properties

 

delivery_mode : 메시지 속성이 persistent, transtient 표시해주는데 사용됩니다. 

contend_type : 인코딩의 MIME 유형을 설명하는데 사용됩니다.

reply_to : 롤백 큐의 이름을 지정하는데 사용됩니다.
correlation_id : RPC
응답을 요청과 상관시키는데 유용합니다.

 

- RPC 동작방식

동작 방식 (출처 : RabbitMQ 홈페이지)

. 클라이언트가 시작되면 익명의 배타적 콜백 대기열을 생성합니다.

. RPC 요청의 경우 클라이언트는 콜백 대기열로 설정되는 reply_to 모든 요청에 대해 고유값으로 설정되는 correlation_id 가지 속성이 있는 메시지를 보냅니다.

. 요청은 rpc_queue 대기열로 전송됩니다.

. RPC 작업자(서버) 해당 큐에서 요청을 기다리고, 요청이 오면 작업을 수행하고 reply_to 필드의 대기열을 사용하여 결과가 포함된 메시지를 클라이언트로 다시 보냅니다.

. 클라이언트는 콜백 큐에서 데이터를 기다립니다. 메시지가 나타나면 correlation_id 속성을 확인 요청값과 일치하면 어플리케이션에 응답을 반환 합니다.

 

RPC 이점으로는 다음과 같다.

- RPC서버가 너무 느린 경우 RPC 서버를 하나 띄워 확장 시킬 있습니다.

- 클라이언트 측에서 RPC 하나의 메시지만 보내고 받기 떄문에 queue_declare 같이 동기 호출이 필요하지 않습니다. 결과적으로 RPC 클라이언트는 단일 RPC 요청에 대해 한번의 네트워크 왕복만 있으면 됩니다.



1.2.6.1.  Python

 

- Python 소스 코드 작성


rpc_server.py

#!/usr/bin/env python
import pika
 
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
 
channel = connection.channel()
 
channel.queue_declare(queue='rpc_queue')
 
def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)
 
def on_request(ch, method, props, body):
    n = int(body)
 
    print(" [.] fib(%s)" % n)
    response = fib(n)
 
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
 
print(" [x] Awaiting RPC requests")
channel.start_consuming()


rpc_client.py

#!/usr/bin/env python
import pika
import uuid
 
class FibonacciRpcClient(object):
 
    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost'))
 
        self.channel = self.connection.channel()
 
        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue
 
        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True)
 
    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body
 
    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)
 
 
fibonacci_rpc = FibonacciRpcClient()
 
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

- Message Queue테스트

# rpc_server 실행

mwno@mwno-VirtualBox:/product/python$ python3 rpc_server.py
 [x] Awaiting RPC requests

 

# client 실행

mwno@mwno-VirtualBox:/product/python$ python3 rpc_client.py
 [x] Requesting fib(30)
 [.] Got 832040

 

# rpc_server 확인

mwno@mwno-VirtualBox:/product/python$ python3 rpc_server.py
 [x] Awaiting RPC requests
 [.] fib(30)