개발/오늘 배운 지식

[FastAPI/Python] FastAPI에서의 인증과 보안

Woogie2 2022. 8. 11. 11:17
반응형

Chapter 7: FastAPI에서의 인증과 보안

Building Data Science Applications with FastAPI”의 챕터 7을 공부하며 정리한 내용입니다.

API에는 모든 이가 접근할 수 있는 Public API도 있지만, 인증된 사용자들만 접근 가능한 API가 대부분이다. 따라서 private token으로 접근성을 낮추거나, 적절한 인증 시스템을 구축해야 한다. 이 챕터에서는 FastAPI가 제공하는 보안과 관련된 의존성에 대해 다룬다.

또한 웹 브라우저에서 발생하는 CORS와 CSRF 공격에 대해서도 다룬다.

세부적인 내용들은 아래와 같다.

  • Security dependencies in FastAPI
  • Retrieving a user and generating an access token
  • Securing API endpoints for authenticated users
  • Securing endpoints with access token
  • Configuring CORS and protecting against CSRF attacks

Security dependencies in FastAPI

REST API, 더욱 일반론적으로 HTTP 엔드포인트를 보호하기 위해서는 많은 방법들이 존재한다.

일반적으로 자주 채택되는 방식은 아래와 같다.

  • Basic HTTP authentication: 이 방식에서는 유저의 credential들이 HTTP 헤더에 담기게 된다(Authorization). 값들은 Basic 키워드로 구성되어 있다. 그리고 그 값들은 Base64로 인코딩 된다. 하지만, 매 요청마다 유저의 암호 등이 담겨있기에 안전하다고 볼 수 없다.
  • Cookies: 클라이언트 단(유저가 사용하는 웹브라우저)에 정적인 데이터를 저장하는 방식이다. 쿠키는 서버에서 각 유저에게 부여하는 세션 토큰을 지니고 있는 경우가 많다.
  • Tokens in the Authorization header: 가장 자주 쓰이는 헤더이다. Authorization 헤더에 토큰을 담고 있다. 토큰은 Bearer와 같은 method keyword에 의한 접두사를 가지고 있다. 이 토큰은 서버 측에서 검증하고, 개별 유저에 링크된다.

FastAPI에서는 대부분의 방식을 즉시 사용할 수 있다(의존성을 이용해서).

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import APIKeyHeader

API_TOKEN = "SECRET_API_TOKEN"

app = FastAPI()
api_key_header = APIKeyHeader(name="Token")

@app.get("/protected-route")
async def protected_route(token: str = Depends(api_key_header)):
    if token != API_TOKEN:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
    return {"hello": "world"}

여기서 사용된 APIKeyHeader는 security dependency이다. 이 의존성은 name에 해당하는 헤더의 값을 받아온다.

구현된 코드에서 토큰이 일치하지 않는 경우 HTTP_403_FORBIDDEN error를 발생시킨다.

이렇게 보안과 관련된 의존성을 따로 구현하는 이유를 책에서는 두 가지로 설명한다.

  1. 헤더가 존재하는지 아닌지를 확인하는 과정을 의존성을 분리시키고, error handling 과정을 분리한다. 따라서 우리가 구현한 코드에는 헤더가 일치하지 않는 경우만 error를 발생시키는 것이다.
  2. 더 중요한 점은, OpenAPI schema에 감지되기 때문에, 자동 생성되는 문서에 연관된 정보가 포함된다는 점이다. 이 엔드포인트가 보호되고 있는지, 어떤 헤더를 요구하는지 등을 보여줄 수 있다.
async def api_token(token: str = Depends(APIKeyHeader(name="Token"))):
    if token != API_TOKEN:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)

@app.get("/protected-route", dependencies=[Depends(api_token)])
async def protected_route():
    return {"hello": "world"}

이러한 의존성들은 라우터 전체나 app 전체를 보호할 수 있는 좋은 의존성들이다. 하지만 이 방식이 완벽히 안전하다고 말할 수는 없다. 먼저, API가 항상 HTTPS를 이용하여 제공되고 있는지, 그리고 헤더내에서 보안 토큰이 노출되지 않는지 등을 확인하여야 한다.

그다음, 만약 private 한 마이크로 서비스라면, 인증되고 신뢰할 수 있는 서버만 API를 호출할 수 있도록 해야 한다. 즉, 유저에게는 노출시키지 않는 것이 좋다.

대부분의 서비스들은 아래와 같은 패턴으로 구성된다.

  • 유저에게 이메일과 패스워드를 받아서 서비스에 등록한다
  • 이를 서버에 기록하고, 이 정보들이 유효한지 서비스에서 확인한다.
  • 이후 서버 측에서 세션 토큰을 제공하고, 유저는 이후의 요청에서 세션 토큰을 이용하여 인증을 진행한다. 이 세션 토큰은 대부분 유효 기한이 정해져 있다.

유저의 암호를 DB에 안전하게 저장하기

비밀번호의 저장은 다른 정보보다 보안에 신경 써야 한다. 그래서 일반적인 텍스트로 저장하는 것이 아닌 cryptographic hash function을 이용해서 비밀번호를 암호화할 수 있다.

이제 FastAPI와 Tortoise ORM을 이용해서 구현해보자.

모델과 테이블 스키마 만들기

class UserBase(BaseModel):
    email: EmailStr

    class Config:
        orm_mode = True

class UserCreate(UserBase):
    password: str

class User(UserBase):
    id: int

class UserDB(User):
    hashed_password: str

DB에 저장되는 데이터 모델의 경우 해싱된 패스워드(hased_password)가 저장된다.

class UserTortoise(Model):
    id = fields.IntField(pk=True, generated=True)
    email = fields.CharField(index=True, unique=True, null=False, max_length=255)
    hashed_password = fields.CharField(null=False, max_length=255)

    class Meta:
        table = "users"

여기서 email 필드는 unique=True로 설정하여, 중복되는 이메일이 저장되지 않도록 강제하였다.

비밀번호 해싱하기

passlib에 있는 bcrypt 의존성을 사용하기 위해서 다음과 같은 명령어로 설치한다.

pip install 'passlib[bcrypt]'

password.py를 생성 후 아래와 같이 구현한다.

import secrets

from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def get_password_hash(password: str) -> str:
    return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

def generate_token() -> str:
    return secrets.token_urlsafe(32)

CryptContext는 다양한 종류의 해시 알고리즘을 사용할 수 있는 클래스이다.

회원가입 라우트 구현하기

유저에게 새로 설정한 비밀번호를 받고, 서버에서 해싱을 한 비밀번호를 DB에 저장하도록 구현해야 한다.

app.post("/register", status_code=status.HTTP_201_CREATED)
async def register(user: UserCreate) -> User:
    hashed_password = get_password_hash(user.password)

    try:
        user_tortoise = await UserTortoise.create(
            **user.dict(), hashed_password=hashed_password
        )
    except IntegrityError:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST, detail="Email already exists"
        )

    return User.from_orm(user_tortoise)

앞서 password 패키지에 구현해둔 함수를 이용한다. 여기서 IntegrityError는 이미 존재하는 이메일을 등록하려 할 때 발생한다. 또한 여기서 반환하는 타입을 보면, UserDB 모델이 아닌 User모델임을 확인할 수 있다. 이는 해싱된 비밀번호가 유출되지 않도록 하는 구현 방식이다.

다음으로는 로그인하는 엔드포인트를 구현해야 한다. 유저가 credential들(email, password)을 request payload에 담아서 보내면, 서버에서 검증을 진행한다. 만약, 이 정보들이 올바르다면 access token을 발행하여 응답에 담아서 유저에게 전송한다.

DB 엑세스 토큰 구현하기

먼저, 이 엑세스 토큰은 특정 유저를 구별할 수 있어야 하고, 제 3자의 위조를 방지해야 한다. 이를 위해 이 섹션에서는 임의의 문자열을 생성하고, 이 토큰을 위한 table을 DB에 생성하는 방식으로 구현한다. 그리고 foreign key로 user를 참조하도록 만든다.

class AccessToken(BaseModel):
    user_id: int
    access_token: str = Field(default_factory=generate_token)
    expiration_date: datetime = Field(default_factory=get_expiration_date)

    class Config:
        orm_mode = True

이 모델은 Pydantic data model이고 FastAPI 엔드포인트가 반환하는 형태를 정의한다.

  • access_token은 앞서 구현한 함수를 default_factory로 설정하였다.
  • expiration_date는 주로 현 시간부터 24시간 뒤로 설정하는 것이 일반적이다.

이후 DB에 저장되는 데이터 모델도 추가로 정의해야 한다.

class AccessTokenTortoise(Model):
    access_token = fields.CharField(pk=True, max_length=255)
    user = fields.ForeignKeyField("models.UserTortoise", null=False)
    expiration_date = fields.DatetimeField(null=False)

    class Meta:
        table = "access_tokens"

이제 로그인을 처리하는 엔드포인트를 구현해보자.

@app.post("/token")
async def create_token(
    form_data: OAuth2PasswordRequestForm = Depends(OAuth2PasswordRequestForm),
):
    email = form_data.username
    password = form_data.password
    user = await authenticate(email, password)

    if not user:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)

    token = await create_access_token(user)

    return {"access_token": token.access_token, "token_type": "bearer"}

FastAPI에서 제공되는 OAuth2PasswordRequestForm 의존성을 주입한다. 이 의존성은 username, password 등의 인자를 받는다. 이 클래스를 이용하는 이유는 OpenAPI schema와 통합하기 위함이다. 그리고 response에 담긴 access token을 이후 이어지는 request header에 자동적으로 설정해준다는 이점이 있다.

이 클래스는 OAuth2 프로토콜을 따른다. 이 프로토콜은 client ID와 secret field를 가지고 있다. 이를 FastAPI에서는 쉽게 사용할 수 있도록 미리 구현해 둔 것이다.

받아온 email과 password를 이용해 user 정보를 받아오는 authenticate 함수를 보면,

async def authenticate(email: str, password: str) -> Optional[UserDB]:
    try:
        user = await UserTortoise.get(email=email)
    except DoesNotExist:
        return None

    if not verify_password(password, user.hashed_password):
        return None

    return UserDB.from_orm(user)

DB로부터 원하는 email을 가지고 있는 유저 정보를 찾고, 비밀번호를 hashed_password와 비교 검증한다. 그리고 Pydantic Model로 변환하여 반환한다.

이 챕터의 예제에서는 단순한 passlib의 bcrypt를 이용하여 해시 값을 비교했지만, 실제로는 password 해시를 더욱 업그레이드하여야 한다. 따라서 다른 해시 함수를 이용하고 싶을 경우 링크를 참조하여 진행하도록 한다.

엑세스 토큰을 이용하여 엔드포인트 여러 개 보호하기

async def get_current_user(
    token: str = Depends(OAuth2PasswordBearer(tokenUrl="/token")),
) -> UserTortoise:
    try:
        access_token: AccessTokenTortoise = await AccessTokenTortoise.get(
            access_token=token, expiration_date__gte=timezone.now()
        ).prefetch_related("user")
        return cast(UserTortoise, access_token.user)
    except DoesNotExist:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)

여기서 사용한 OAuth2PasswordBearer는 앞선 섹션에서 봤던 OAuth2PasswordRequestForm와 밀접한 연관이 있다. 여기서는 OpenAPI 스키마에 새 토큰을 가져올 엔드포인트가 “/token”임을 알려준다. 이후 AccessTokenTortoise Table에서 token과 일치하는 값을 가지는 entity를 가져오고, expiration_date__gte를 통해서 현재 시각보다 만료 시각이 더욱 나중이라는 것도 확인한다. __gtefilter modifier이며 “greater than or equal to” 라는 의미를 가진다. 나머지는 modifier 들은 Tortoise 공식 문서에서 확인할 수 있다. 원하는 유저 정보가 없다면, 401 오류를 발생시키면 된다.

마지막으로 전체적인 route를 구현하면 인증 시스템 구현이 완료된다.

@app.get("/protected-route", response_model=User)
async def protected_route(user: UserDB = Depends(get_current_user)):
    return User.from_orm(user)

하지만, 대부분의 REST API는 브라우저에서 많이 호출되므로 CORS 설정과 CSRF 공격에 대해서도 대비해야 한다.

CORS 설정과 CSRF 공격을 막기

최근 몇 년간 자바스크립트의 빠른 성장으로 인해서 Frontend와 Backend로 명확히 구분되는 추세이다. 따라서 Backend는 데이터를 저장하고, 수신하거나 비즈니스 로직을 수행한다. 하지만 여전히 인증 과정을 수행해야 하는데, cookie로 이 과정을 진행하면 더욱 좋다.

쿠키는 브라우저의 메모리에 유저의 정보를 저장하도록 설계되었다. 그리고 매번 요청을 보낼 때, 서버에 자동적으로 전송된다. 그러나 쿠키는 편리한 만큼 해커들의 표적이 되기 쉽고, 이미 수년간 많은 공격을 받아왔다.

가장 흔한 것이 Cross-Site Request Forgery(CSRF) 라고 불리는 공격이다. 이 공격의 시나리오를 살펴보면, 우리의 웹사이트에 최근에 인증한 사람을 자신들의 웹사이트(가짜)로 끌어들여 속이려는 시도이다.

CORS에 대한 이해와 FastAPI에서 올바르게 설정하기

보통 프론트엔드와 백엔드를 분리했다면, 같은 sub-domain에서 serving 되진 않는다. 보통 frontend는 www.example.com, backend는 api.example.com과 같은 도메인에서 서빙된다.

그러나, 브라우저들은 cross-origin HTTP requests를 허용하지 않는다. 도메인 A는 도메인 B에 요청을 보낼 수 없다는 뜻이다. 이것은 same-origin policy에 따른 것이다. 이 규칙은 CSRF 공격을 막을 수 있는 방법 중 하나이다.

자세한 내용은 링크 참조

특히나 책에서는 simple request(GET,POST,HEAD)의 경우 브라우저에서는 정상적으로 request가 발송되고, 서버에서 수신 및 응답이 잘 동작한다고 한다. 그러나 브라우저에서는 응답을 제대로 보여주지 않고, 오류를 발생시킨다. 따라서 이 simple request들의 경우 same-origin policy 외에 다른 방식이 추가되어야 한다.

이럴 때는 preflight 쿼리를 통해서 제약 사항을 완화시키는 것이 필요하다. 이는 도메인 A에서 도메인 B를 호출하는 등의 동작을 가능하게 만든다. 따라서 우리가 원치 않는 동작이 허용되는 것을 막기 위해 세심한 설정이 필요하다.

다행히도, FastAPI에는 관련 설정이 비교적 쉽다. CORSMiddleware를 import 해서 사용하면 된다(Starlette에서 제공).

from fastapi import FastAPI, Request
from starlette.middleware.cors import CORSMiddleware

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:9000"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
    max_age=-1,  # Only for the sake of the example. Remove this in your own project.
)

이렇게 미들웨어를 추가해서 허용되는 origin과 메서드, 헤더 등을 설정해줄 수 있다. 여기서 추가하는 미들웨어는 ASGI 애플리케이션이 path operation을 통해 요청을 처리하기 전, 후에 수행되는 global logic이다.

여기서 CORSMiddleware는 prefilight request들을 받아서 적절한 CORS 헤더(설정에 맞는)를 응답으로 보낸다.

  • 가장 중요한 설정값은 allow_origins이다. API에 request를 보낼 수 있는 도메인의 목록을 뜻한다. -
  • allow_credentials를 살펴보자. 브라우저는 cross-origin HTTP request에서는 쿠키를 전송하지 않는다. 만약, authenticated request들을 우리의 API에 보내려면 이 옵션을 켜야 한다. CORS 미들웨어 설정에 대한 설명 링크
  • max_age: cache duration of the CORS response. 이 설정을 통해 preflight 요청의 비율을 줄여서 성능 향상을 꾀할 수 있다. 이 예제에서는 -1 값을 설정해 비활성화한 상태이다.

여기서 아직 해결하지 못한 점은 simple request의 경우, POST 요청이 허용되고 있다는 점이다. 그리고 response를 읽을 수 없더라도 서버에서는 정상적으로 수행되는 경우도 있다는 점이다.

이제, double-submit cookie와 같은 방법으로 CSRF 공격을 막는 법을 배운다.

Double-submit cookies 구현하기

브라우저의 쿠키에만 의존하여 유저의 credential을 저장하면, 브라우저가 자동적으로 쿠키를 서버에 보내는 특성 때문에 CSRF 공격의 위험에 노출된다. 특히나 브라우저가 CORS 규칙을 강제하지 않는 simple request라고 판단하는 경우에 발생한다. 전통적인 HTML 폼 제출, 이미지 태그의 src attribute 등에서도 발생할 수 있다.

이러한 이유로 우리는 또 다른 계층의 보안 시스템을 만들어야 한다. 이 섹션에서 다루는 것은 API가 웹에서 사용되고 authentication에 쿠키를 사용하는 경우에만 사용되는 방법이다.

starlette-csrf라는 라이브러리를 pip으로 설치한다.

@app.post("/login")
async def login(response: Response, email: str = Form(...), password: str = Form(...)):
    user = await authenticate(email, password)

    if not user:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)

    token = await create_access_token(user)

    response.set_cookie(
        TOKEN_COOKIE_NAME,
        token.access_token,
        max_age=token.max_age(),
        secure=True,
        httponly=True,
        samesite="lax",
    )

위의 코드에서 미리 알아둬야 할 점은 쿠키에서 Secure, HttpOnly플래그를 사용했다는 점이다. 이 설정은 HTTPS 연결을 통해서만 전송되고 자바스크립트는 값을 읽을 수 없도록 만든다. 민감한 정보를 다룰 때는 중요한 설정이다.

그리고 samesite 플래그는 lax로 설정했다. 이건 꽤 최근에 나온 플래그이고, cross-origin 상황에서 어떻게 쿠키가 전송되는지 관리할 수 있게 해 준다. lax는 대부분의 브라우저에서 기본으로 사용되는 값이다. 쿠키가 sub-domain들에 전송될 수는 있지만, 아예 다른 도메인에는 전송되지 않도록 하는 설정이다.

이제, 인증된 유저를 확인할 때, 그냥 Request에 담긴 쿠키로부터 토큰을 가져와서 확인하면 된다.

FastAPI에서는 APIKeyCookies라는 의존성을 제공한다.

sync def get_current_user(
    token: str = Depends(APIKeyCookie(name=TOKEN_COOKIE_NAME)),
) -> UserTortoise:
    try:
        access_token: AccessTokenTortoise = await AccessTokenTortoise.get(
            access_token=token, expiration_date__gte=timezone.now()
        ).prefetch_related("user")
        return cast(UserTortoise, access_token.user)
    except DoesNotExist:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)

나머지 코드는 앞서서 구현했던 것과 같다. 이제 email 주소를 업데이트하는 엔드포인트를 만들어보자.

@app.post("/me", response_model=User)
async def update_me(
    user_update: UserUpdate, user: UserTortoise = Depends(get_current_user)
):
    user.update_from_dict(user_update.dict(exclude_unset=True))
    await user.save()

    return User.from_orm(user)

이 엔드포인트는 POST 요청이기 때문에 CSRF 공격에 노출되어있다. 그 이유는 POST의 경우, 추가적인 헤더를 설정하지 않는다면 simple request로 간주해서 브라우저에서는 오류가 발생하더라도 서버에서는 정상적으로 수행될 수 있다. 따라서 공격자가 우리 서버에 인증한 유저의 이메일 주소를 임의로 바꿀 수도 있는 것이다.

여기가 double submit cookie pattern이 필요한 지점이다.

  1. 유저가 첫 번째 요청을 안전하다고 여겨지는 method(예: GET)로 전송한다.
  2. 응답에 secret random value(CSRF token)를 담아서 client(user)에게 보낸다.
  3. POST와 같은 안전하지 않은 요청을 보낼 때, 유저는 쿠키 내의 CSRF 토큰을 읽어서 같은 값을 헤더에 담아 보낸다. 브라우저는 메모리에 저장된 쿠키를 요청에 담아서 보내기 때문에, Request에는 쿠키와 헤더 두 곳에 token이 담겨있다. 이렇기 때문에 double submit 이라고 부른다.
  4. 들어온 요청을 처리하기 전에, 서버는 헤더와 쿠키 내의 토큰을 비교한다. 두 토큰이 같아야 요청을 수행하고, 다르다면 오류를 발생시켜야 한다.

이 방식은 두 가지 이유를 들어 안전하다고 볼 수 있다.

  • 공격자는 제3의 웹사이트에 있으므로 쿠키를 읽을 수 없다. 따라서 그들은 CSRF 토큰 값을 수신할 방법이 없는 것이다.
  • 커스텀 헤더를 추가하는 것은 “simple requests”의 조건에 위배되기 때문에, 브라우저는 preflight request를 보내게 된다. 이는 CORS 규칙을 강제할 수 있다는 말이 된다.

이 방식을 미들웨어를 통해서 구현하면 다음과 같다.

from starlette_csrf import CSRFMiddleware

TOKEN_COOKIE_NAME = "token"
CSRF_TOKEN_SECRET = "__CHANGE_THIS_WITH_YOUR_OWN_SECRET_VALUE__"

app.add_middleware(
    CSRFMiddleware,
    secret=CSRF_TOKEN_SECRET,
    sensitive_cookies={TOKEN_COOKIE_NAME},
    cookie_domain="localhost",
)
  • secret: secret value를 설정
  • sensitive_cookies: 쿠키 이름을 설정해서 해당되는 쿠키가 존재하면, CSRF-protection을 활성화하고, 없는 경우는 CSRF check을 생략할 수 있다.
  • cookie_domain: 쿠키 도메인을 설정하면, 설정한 도메인과 서브 도메인에서 쿠키를 받아올 수 있게 해 준다. cross-origin 상황에서 필요하다.

CSRF 공격을 막는 방법을 배웠다. 대부분 라이브러리에서 제공하는 미들웨어를 통해 처리되었다(starlette). 그래서 자동으로 생성되는 interactive documentation에 문제를 발생시킨다는 단점이 있다.

OWASP Cheat Sheet Series에서 웹 애플리케이션에서 발생할 수 있는 여러 보안 문제들을 짚어볼 수 있다.

반응형