개발/오늘 배운 지식

[FastAPI/Python] 양방향 통신을 위한 웹소켓 in FastAPI

Woogie2 2022. 8. 12. 16:27
반응형

Chapter 8: 양방향 통신을 위한 웹소켓 in FastAPI

| “Building Data Science Applications with FastAPI”의 챕터 8을 공부하며 정리한 내용입니다.

HyperText Transfer Protocol (HTTP)는 서버에 데이터를 보내고 받기에 간편하지만 강력한 기술이다.

하지만 채팅과 같은 양방향 소통이 중요한 경우 HTTP로 구현한다면 상당히 불편함이 따른다. 짧은 시간 내에 많은 요청과 응답을 만들어야 하기에, 자원 낭비가 심할 것이다.

이러한 HTTP 단점 때문에 새로운 프로토콜 웹소켓(WebSocket)이 인기를 끌고 있다. 이 프로토콜의 목표는 client와 server 사이에 communication channel을 열어서 그들이 실시간으로 양방향 소통을 할 수 있게 하는 것이다.

이 챕터에서 다루게 될 주제는 아래와 같다.

  • 웹소켓의 양방향 통신의 원리를 이해하기
  • FastAPI에서 웹소켓을 생성하기
  • 여러 개의 웹소켓 연결을 다루고, 메세지를 broadcasting 하기

환경 설정하기

이 챕터에서는 로컬 PC에 레디스 서버가 동작하고 있어야 한다. 도커를 활용하므로 도커도 미리 설치하자. 아래의 명령어를 도커 daemon을 켠 상태에서 실행한다.

$ docker run -d --name fastapi-redis -p 6379:6379 redis

이제 local 컴퓨터의 6379 포트에서 redis 서버에 접근할 수 있다.

웹소켓의 양방향 통신의 원리를 이해하기

웹소켓이라는 이름에서 알 수 있듯이 유닉스 시스템의 소켓과 밀접한 연관이 있다. 기술적으로는 서로 연관이 없지만, 두가지 모두 이루고 싶은 목표가 동일하다. 그 목표는 두 애플리케이션 사이에 통신 채널을 여는 것이다.

웹소켓은 전이중 통신 채널을 열어서 문제를 해결하고자 한다. 즉, 메세지가 두 가지 방향 모두에 전달이 가능하고, 동시에 전달이 가능하다는 뜻이다. 일단 채널이 열리면, 서버는 client 요청이 없어도 메시지를 client에 보낼 수 있다.

HTTP와 WebSocket은 다른 프로토콜이지만, 웹소켓은 HTTP와 함께 동작할 수 있도록 설게되었다. 사실, 웹소켓이 처음 열릴 때 HTTP 요청을 통해서 시작되고, 이후에 WebSocket tunnel로 업그레이드된다. 이 과정 덕분에 전통적인 80,443 포트와도 즉시 호환이 된다.

웹소켓은 Uniform Resoruce Identifiers (URIs)를 이용한다는 점에서도 HTTP와 유사하다. 따라서 URIs를 통해 host, path, query parameter 등을 구별할 수 있다. 게다가 http, https 처럼 ws(WebSocket), wss(WebSocket Secure)이 존재한다. wss는 Secure Sockets Layer/Transport Layer Security (SSL/TLS)-encrypted connection을 위한 것이다.

이렇게 유사한 점들이 있지만, 양방향 대화 채널을 다루는 방법은 기존의 HTTP request를 다루는 방식과는 꽤나 다르다. 그래서 생각하는 방식을 기존과는 달리해야한다. FastAPI는 비동기적 특성을 가진 웹소켓 구현체를 제공하기에 우리에게 큰 도움이 된다.

FastAPI에서 웹소켓 생성하기

FastAPI에서는 기본적으로 웹소켓을 지원하기 때문에 생성하는 것은 쉽다. 하지만, 웹소켓을 활용해서 새로운 로직을 추가하는 것은 복잡하다.

from fastapi import FastAPI, WebSocket
from starlette.websockets import WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Message text was: {data}")
    except WebSocketDisconnect:
        await websocket.close()

위의 예제는 단순히 문자를 받고, 상대방에게 받은 문자가 무엇인지 다시 보내는 엔드포인트이다. HTTP 엔드포인트와 다른 점에 집중해서 살펴보자.

먼저, FastAPI에서 제공하는 websocket 데코레이터를 사용해서 엔드포인트를 생성했다. 일반적인 엔드포인트처럼 path에서 인자를 받아올 수 있지만, response_model, status code와 같은 인자는 받을 수 없다.

path operation 함수에서는 WebSocket 개체를 주입할 수 있다.

함수 내부에선 처음에 accept 메서드를 호출하였다. 이 메서드는 클라이언트에게 서버가 tunnel을 여는 것을 동의 한다고 알린다. 따라서 처음 웹소켓 엔드포인트를 구현할 때, 필수적으로 호출되어야 한다.

그다음 무한루프를 돌면서, 문자열 수신을 기다린다. 이는 HTTP 엔드포인트와 가장 다른 부분이다. tunnel이 닫힐 때까지 이 무한루프를 계속 돈다.

루프 내부에서는 receive_text를 먼저 호출한다. 이 메서드는 클라이언트로부터 메시지를 받아올 때까지 blocking 된다.

일반적인 blocking 되는 무한루프와 다르게, event loop를 활용하여, blocking 되는 동안 비동기적으로 다른 작업을 수행할 수 있다.

데이터가 수신되면, 우리가 수신했던 문자열을 확인하는 메세지를 클라이언트에게 전송한다.

그리고 여기서 try, except 구문을 사용하였는데, client disconnection에 대응하기 위해서 필수적으로 감싸주어야 한다. 클라이언트가 연결을 끊으면, receive_text 메서드가 error를 발생시키는데, 이때 발생하는 exception은 WebSocketDisconnect이다. 따라서 try, except 구문을 사용하고, 예외 처리의 마지막엔 서버의 웹소켓 연결도 닫는 websocket.close()를 호출해야한다.

책에서 제공해주는 소스코드 루트에서

$ uvicorn chapter8.echo.app:app

명령어로 서버를 실행한 다음, 간단한 HTML 페이지를 실행시키면

$ python -m http.server --directory chapter8/echo 9000

HTML 실행 결과

위와 같이 구현한 웹소켓을 테스트해 볼 수 있다!

이 예제에서 프론트엔드 구현은 자바스크립트로 이루어져 있다. 코드를 확인해보면 자바스크립트에서 제공하는 WebSocket 클래스를 이용해서 엔드포인트의 URL을 제공하면 우리가 원하는 엔드포인트와 웹소켓이 연결된다.

const socket = new WebSocket('ws://localhost:8000/ws');

// Connection opened

socket.addEventListener('open', function (event) {

  // Send message on form submission

  document.getElementById('form').addEventListener('submit', (event) => {

    event.preventDefault();

    const message = document.getElementById('message').value;

    addMessage(message, 'client');

    socket.send(message);

    event.target.reset();

  });

});

// Listen for messages

socket.addEventListener('message', function (event) {

  addMessage(event.data, 'server');

});

자바스크립트의 WebSocket API 문서에서 추가적인 정보를 확인할 수 있다.

Handling Concurrency

앞선 예제에서는 클라이언트가 먼저 메세지를 보내는 경우만을 생각했지만, 서버 측에서 클라이언트의 메세지를 기다리느라 blocking 된 도중에 서버측에서 클라이언트에게 메세지를 보내고 싶은 경우도 있을 것이다. 이러한 경우 어떻게 구현해야 하는지 알아보자.

이 문제를 해결하기 위해서 asyncio 모듈을 활용한다. 이 모듈은 여러 개의 coroutine을 스케줄링하는 함수를 제공하고 그중 하나가 끝날 때까지 기다릴 수 있게 한다.

앞선 예제의 경우, 클라이언트의 메세지를 기다리는 코루틴 1개, 메세지를 받은 다음 클라이언트에게 메세지를 전송하는 코루틴 1개가 존재한다.

이것을 더욱 명확하게 만든 다른 예제를 살펴보자.

import asyncio
from datetime import datetime

from fastapi import FastAPI, WebSocket, status
from starlette.websockets import WebSocketDisconnect

app = FastAPI()

async def echo_message(websocket: WebSocket):
    data = await websocket.receive_text()
    await websocket.send_text(f"Message text was: {data}")

async def send_time(websocket: WebSocket):
    await asyncio.sleep(10)
    await websocket.send_text(f"It is: {datetime.utcnow().isoformat()}")

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            echo_message_task = asyncio.create_task(echo_message(websocket))
            send_time_task = asyncio.create_task(send_time(websocket))
            done, pending = await asyncio.wait(
                {echo_message_task, send_time_task},
                return_when=asyncio.FIRST_COMPLETED,
            )
            for task in pending:
                task.cancel()
            for task in done:
                task.result()
    except WebSocketDisconnect:
        await websocket.close()

echo_message와 send_time 두 가지 함수가 구현되어 있고, 웹소켓 엔드포인트에서 각각 차례로 task를 만들어 수행된다.

echo_message, send_time 함수 모두 인자로 websocket 인스턴스를 받는다.

구현된 코드에서 가장 중요한 부분은 asyncio.create_task이다. 이 함수는 coroutineTask 객체로 변환한다. 그 결과, 우리가 coroutine의 실행 과정을 완전히 제어할 수 있게 된다.

task 객체는 asyncio.wait을 사용하기 위해 필수적이다. 이 wait 함수는 코루틴을 동시에(concurrently) 실행할 때 매우 유용하다. 첫 번째 인자로 실행할 task들의 집합(set)을 받는다. 기본 설정에 의하면 이 함수는 인자로 제공된 task들이 모두 완료될 때까지 blocking 된다. 그러나 우리가 return_when인자를 설정하여 task 중 1개가 끝나면 blocking 된 상태에서 벗어나서 이후의 코드를 수행할 수 있게 된 것이다. 반환 값으로는 done, pending 상태에 맞는 task set을 반환하게 된다.

각 set 내에 task 각각에 대해서 완료가 안된 경우(pending), iteration 마다의 코루틴이 쌓여서 문제가 생길 수 있기 때문에 task.cancle()을 수행한다. 완료된 경우(done) task.result()를 수행하는데, 이는 코루틴 수행 결과물을 반환하고, exception을 re-raise하는 역할을 한다. 클라이언트 쪽에서 연결이 끊기는 경우를 다룰 때 유용하다.

메세지를 10초간 입력하지 않으면 시간이 전송된다.

앞서서 서버와 HTML 페이지를 실행하던 명령어에서 echo를 concurrency로 바꾸면 새로운 예제를 실행할 수 있다.

의존성(dependency) 사용하기

WebSocket 엔드포인트도 미리 구현된 의존성을 사용할 수 있다. 하지만, 기존의 의존성들은 HTTP를 생각하며 만들어졌기 때문에 몇 가지 제약이 발생할 수 있다.

먼저 security dependency를 사용할 수 없다. 이 의존성들은 (HTTP) Request 객체를 의존성으로 받아서 동작하기 때문이다.

따라서, 책에서는 일단 WebSocket dependency 들을 optional로 설정하기를 권하고 있다. 그리고 missing value를 수동으로 다뤄야 한다.

@app.websocket("/ws")
async def websocket_endpoint(
    websocket: WebSocket,
    username: str = "Anonymous",
    token: Optional[str] = Cookie(None),
):
    if token != API_TOKEN:
        await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
        return

    await websocket.accept()
    await websocket.send_text(f"Hello, {username}!")
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Message text was: {data}")
    except WebSocketDisconnect:
        await websocket.close()

위의 예시 코드에서는 username, token dependency를 주입하였다. 일반적인 HTTP 엔드포인트와 다르지 않다. 여기서 주목할 점은 Optional로 처리했거나 기본값을 설정해주었다는 점이다.

코드가 실행되면 처음에 Authentication을 진행한다. 만약 토큰이 우리가 원하는 값과 다르다면 웹소켓을 닫는다. 이때 status code를 함께 설정하며 닫는다(사용할 수 있는 status code 목록). 여기서는 가장 일반적인 1008 코드(Policy Violation)를 설정해주었다.

여러 개의 웹소켓 연결을 다루고, 메세지를 broadcasting 하기

노션의 mermaid로 그려본 그래프

간략하게는 위의 그래프처럼 웹소켓이 동작한다고 생각할 수 있다. 하지만 실제 배포된 다음에 동작하는 것을 생각해보면 서버 내에서도 여러 개의 Worker(process)가 존재해서 같은 process 안에서 WebSockect이 동작한다는 보장을 할 수 없다.

실제로는 아래와 같은 구조로 동작할 것이다.

Worker(프로세스)끼리 메세지 공유가 안된다.

하나의 Process에서 받은 메세지를 다른 Process로 전달해서 최종적으로는 BroadCasting을 정상적으로 수행해야 한다. 이 문제를 풀기 위해서 message broker를 사용한다. 나는 observer와 같이 publish-subscribe 패턴을 따른다. 잘 알려진 message broker 소프트웨어는 Apache Kafka, RabbitMQ 등과 클라우드 기반의 AWS, GCP, Azure 등에서 제공하는 Amazon MQ, Cloud Pub/Sub, Service Bus 등이 있다.

따라서 우리는 Message broker를 하나 정해서 우리 서버에 도입하면 될 것이다.

브로커를 통해서 서로 메세지가 공유된다.

이 단원에서는 Encode의 broadcaster 라이브러리와 Redis를 이용해서 간단한 Message broker를 구현한다.

$ pip install "broadcaster[redis]"

로 설치를 진행한다.

broadcast = Broadcast("redis://localhost:6379")
CHANNEL = "CHAT"

Redis 서버의 URL을 넣어서 broadcast 객체를 인스턴스화 했고, CHANNEL 상수를 선언한다. 이 예제에서는 단순히 “CHAT”이라고 설정했지만 실제로 사용하게 된다면 채널의 이름은 채팅방마다 다를 것이다.

class MessageEvent(BaseModel):
    username: str
    message: str

async def receive_message(websocket: WebSocket, username: str):
    async with broadcast.subscribe(channel=CHANNEL) as subscriber:
        async for event in subscriber:
            message_event = MessageEvent.parse_raw(event.message)
            # Discard user's own messages
            if message_event.username != username:
                await websocket.send_json(message_event.dict())

async def send_message(websocket: WebSocket, username: str):
    data = await websocket.receive_text()
    event = MessageEvent(username=username, message=data)
    await broadcast.publish(channel=CHANNEL, message=event.json())

MessageEvent 모델을 선언해주었다.

receive_message 함수는 broadcast 채널을 구독하고, 메세지를 수신하는 event를 기다린다. 그 다음 Pydantic에서 제공하는 parse_raw를 사용해서 JSON 형식의 메세지를 pydantic model 객체로 deserialize를 진행한다. 그 이후 username과 메세지 이벤트의 username을 비교하는데, 메세지를 보낸 유저에게는 broadcasting하면 같은 메세지가 반복되는 경우가 생기기 때문에 필요한 부분이다. 실제 앱에서는 유저 이름보다는 UID(user identifier)를 활용한다. 다른 유저가 보내온 메세지의 경우 웹소켓을 통해서 메세지를 broadcasting한다(웹소켓 클래스의 send_json 메서드 이용).

send_message 함수를 살펴보자. 자신과 웹소켓으로 연결된 클라이언트에게서 받은 메세지를 브로커에게 전달하는 역할을 한다. 새로운 데이터를 wait하다가 event 객체를 만들어서 publish 하는 것이다.

Message broker를 구현했으니 실제 웹소켓 엔드포인트를 마저 구현해보자.

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket, username: str = "Anonymous"):
    await websocket.accept()
    try:
        while True:
            receive_message_task = asyncio.create_task(
                receive_message(websocket, username)
            )
            send_message_task = asyncio.create_task(send_message(websocket, username))
            done, pending = await asyncio.wait(
                {receive_message_task, send_message_task},
                return_when=asyncio.FIRST_COMPLETED,
            )
            for task in pending:
                task.cancel()
            for task in done:
                task.result()
    except WebSocketDisconnect:
        await websocket.close()

여기서도 동시성을 보장하기 위해서 asyncio를 활용하여 task를 2개 생성한다. 그리고 1개의 task가 끝나면 wait을 멈추게 설정하여 실행한다. 앞서서 구현했던 웹소켓 엔드포인트와 매우 유사하다. 이 엔드포인트에서는 username을 query parameter로 받는다는 것에 주의한다.

마지막으로, FastAPI가 message broker에 정상적으로 연결되도록 만들어야 한다.

@app.on_event("startup")
async def startup():
    await broadcast.connect()

@app.on_event("shutdown")
async def shutdown():
    await broadcast.disconnect()

위에서 구현한 내용을 실행해보면 웹 브라우저의 탭을 여러 개 만들어서 여러 유저를 연결할 수 있다. 아래는 테스트 화면이다.

세명의 유저가 대화하고 있다.

요약

  • 웹소켓에 대해서 배워보았다.
  • 전통적인 HTTP 엔드포인트에 비해 다른 로직들이 존재했음
    • infinite loop에서 메세지를 기다린다
    • concurrency를 고려해야 한다.
    • asyncio를 적극 활용
  • 여러 클라이언트와 연결되는 경우
    • Message broker를 통해서 여러 프로세스의 메세지를 pub/sub 패턴으로 전달한다.
      • Apache Kafka, RabbitMQ 등의 기술이 있으니 필요하면 가져다 쓰기
반응형