개발/오늘 배운 지식

[FastAPI/Python] FastAPI를 Tortoise ORM과 MongoDB와 연동하기

Woogie2 2022. 8. 9. 19:33
반응형

| "Building Data Science Applications with FastAPI"를 읽고 정리한 글입니다.

Tortoise ORM 로고(?)

Tortoise ORM 사용하기

  • ORM 툴을 사용하는 이유: 관계형 DB를 다룰 때, SQL 개념을 추상화하고, 오직 python의 object만 다뤄서 복잡도를 낮추고 싶기 때문이다
  • Tortoise ORM: Modern and asynchronous ORM
  • Django ORM과 유사하다.

각 DB 엔진에 따른 설치법은 링크에서 확인할 수 있다.

DB 모델 생성하기

Tortoise ORM의 역할은 파이썬 객체와 DB의 entity를 연결(link)하는 것이다.

class PostTortoise(Model):

    id = fields.IntField(pk=True, generated=True)

    publication_date = fields.DatetimeField(null=False)

    title = fields.CharField(max_length=255, null=False)

    content = fields.TextField(null=False)

    class Meta:

        table = "posts"

위의 모델은 tortoise.models.Model 클래스를 상속받아서 구현하였다. 클래스의 필드(DB Entity의 Column에 대응)는 각 데이터 타입에 맞는 필드 객체이다. 이제 이에 적절히 대응되는 Pydantic model을 수정해주어야 한다.

class PostBase(BaseModel):

    title: str

    content: str

    publication_date: datetime = Field(default_factory=datetime.now)

    class Config:

        orm_mode = True

sub-classorm_mode를 설정하였다. 이 옵션은 ORM 객체를 pydantic object로 변경할 수 있게 해 준다. 이는 사실 필수적인데, FastAPI는 orm object가 아닌 pydantic model을 다루도록 디자인되었기 때문이다. 따라서 ORM과 Pydantic 객체 사이에서 변환이 많이 일어나기 때문에 헷갈리는 부분이다.

Tortoise engine 세팅하기

TORTOISE_ORM = {

    "connections": {"default": "sqlite://chapter6_tortoise.db"},

    "apps": {

        "models": {

            "models": ["chapter6.tortoise.models"],

            "default_connection": "default",

        },

    },

}

register_tortoise(

    app,

    config=TORTOISE_ORM,

    generate_schemas=True,

    add_exception_handlers=True,

)
  • connections: 딕셔너리 형태로 DB 들을 적어둔다.
  • apps: Tortoise model들을 적어두는 키 값이다.
  • register_tortoise(FastAPI app 인스턴스, config = ORM config, generate_schemas = 자동으로 스키마 생성 여부, add_exception_handlers= 커스텀 exception handler)

객체 만들기

inserting new objects inside our DB.

  • Main Challenge: Tortoise 객체 인스턴스를 Pydantic Model로 변환하는 것
@app.post("/posts", response_model=PostDB, status_code=status.HTTP_201_CREATED)
async def create_post(post: PostCreate) -> PostDB:
    post_tortoise = await PostTortoise.create(**post.dict())
    return PostDB.from_orm(post_tortoise)

PostTortoise 클래스의 static create 메서드를 이용해서, DB에 객체를 생성한다.

PostDB.from_orm 메서드를 통해서 변환할 수 있는데, 아까 sub-class로 config 클래스를 선언하고 orm_mode=True 로 설정해주었기에 가능한 것이다. 항상 FastAPI가 Pydantic 모델을 반환해야하는 이유는 automatic documentation 때문이다.

객체를 필터링하고 가져오기

@app.get("/posts")

async def list_posts(pagination: Tuple[int, int] = Depends(pagination)) -> List[PostDB]:

    skip, limit = pagination

    posts = await PostTortoise.all().offset(skip).limit(limit)

    results = [PostDB.from_orm(post) for post in posts]

    return results
  1. Tortoise object를 쿼리문으로 가져오기.
  2. PostTortoise → PostDB로 변환하기(ORM object → Pydantic Object)
@app.get("/posts/{id}", response_model=PostDB)
async def get_post(post: PostTortoise = Depends(get_post_or_404)) -> PostDB:

    return PostDB.from_orm(post)

async def get_post_or_404(id: int) -> PostTortoise:

    return await PostTortoise.get(id=id)

여기서 exception handler는 어디에 있는지 궁금할 수 있다. 처음에 설정할 때, add_exception_handler = True 로 세팅했기 때문에 Global level에서 자동적으로 404 Error 핸들러가 추가되었다.

객체 수정하기와 삭제하기

@app.patch("/posts/{id}", response_model=PostDB)
async def update_post(
    post_update: PostPartialUpdate, post: PostTortoise = Depends(get_post_or_404)
) -> PostDB:
    post.update_from_dict(post_update.dict(exclude_unset=True))
    await post.save()
    return PostDB.from_orm(post)

update endpoint에서는 직접적으로 DB object를 수정하고 저장한다. 즉, entity를 우리가 원하는 대로 수정할 수 있다는 점이 ORM을 사용하는 핵심적인 이유 중 하나이다. update_from_dict(), save() method 이용.

@app.delete("/posts/{id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_post(post: PostTortoise = Depends(get_post_or_404)):
    await post.delete()

Relationship 추가하기

  • ORM을 이용하는 가장 큰 이유: entity 끼리 realtionship을 쉽게 설정하기 위해.
class CommentTortoise(Model):

    id = fields.IntField(pk=True, generated=True)

    post = fields.ForeignKeyField(

        "models.PostTortoise", related_name="comments", null=False

    )

    publication_date = fields.DatetimeField(null=False)

    content = fields.TextField(null=False)

    class Meta:

        table = "comments"

related_name=”comments” 옵션 덕분에 post에서 자신과 연관된 comments들에 한 번에 접근할 수 있다.

Pydantic Model

class CommentBase(BaseModel):

    post_id: int

    publication_date: datetime = Field(default_factory=datetime.now)

    content: str

    class Config:

        orm_mode = True

여기서 post_id 라고 표기해두면, Tortoise에서 자동적으로 post 라는 foreign key의 id라고 이해할 수 있다.

class PostPublic(PostDB):

    comments: List[CommentDB]

    @validator("comments", pre=True)

    def fetch_comments(cls, v):

        return list(v)

여기서 validator가 등장하는데, Pydantic Model의 빌트인 validation이 실행되기 전에 validation을 진행하고, list(v)를 통해서 query set을 리스트로 변환해준다.

@app.post("/comments", response_model=CommentDB, status_code=status.HTTP_201_CREATED)

async def create_comment(comment: CommentBase) -> CommentDB:

    try:

        await PostTortoise.get(id=comment.post_id)

    except DoesNotExist:

        raise HTTPException(

            status_code=status.HTTP_400_BAD_REQUEST, detail=f"Post {id} does not exist"

        )

    comment_tortoise = await CommentTortoise.create(**comment.dict())

    return CommentDB.from_orm(comment_tortoise)
  • foreign key constraint 에러를 피하기 위해서 처음에 Post가 존재하는지 확인한다.
  • 그다음 코멘트를 생성하는 것이다.

다음과 같은 코드를 통해서 post와 관련된 comments를 불러올 수 있다.

async def get_post_or_404(id: int) -> PostTortoise:

    try:

        return await PostTortoise.get(id=id).prefetch_related("comments")

    except DoesNotExist:

        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)

prefetch_related는 관련된 entity의 이름을 인자로 받는다. 이 부분을 수행할 때, query set을 parsing 하려는 등의 오류가 발생할 수 있기 때문에 exception handling을 구현해주어야 안전하다.

DB migration with Aerich

Tortoise 제작자들이 만든 DB 마이그레이션 툴이다. pip를 이용해서 라이브러리를 설치한다.

TORTOISE_ORM = {

    "connections": {"default": "sqlite://chapter6_tortoise_relationship.db"},

    "apps": {

        "models": {

            "models": ["chapter6.tortoise_relationship.models", "aerich.models"],

            "default_connection": "default",

        },

    },

}

Tortoise 설정에 aerich.models를 추가한다.

$ aerich init -t chapter6.tortoise_relationship.app.TORTOISE_ORM
$ aerich init-db

와 같이 수행할 수 있다. 각 명령어를 순차적으로 실행하자.

$ aerich upgrade

위의 명령어를 실행하면 DB migration이 적용된다.

$ aerich migrate --name added_new_tables

Schema를 변경하고, 이걸 migration script로 생성하려면 위의 명령어를 실행한다.

AERICH MIGRATION SCRIPTS ARE NOT CROSS-DATABASE COMPATIBLE

Alembic과는 다르게 Aerich는 여러 데이터베이스를 지원하지 않는다. 대신에 SQL file을 직접적으로 생성한다(처음에 설정한 DB 엔진에 맞는). 따라서 개발용과 출시용 DB 엔진을 통일해야 한다.

Motor를 이용해서 MongoDB 사용하기

위에서 설명했던 내용들은 모두 관계형 데이터베이스에 대한 내용들이다.

하지만, 지금부터는 문서-지향적 데이터베이스에 관한 내용이 이어진다.

  • Motor: 몽고 DB와 비동기적으로 통신할 수 있게 해주는 라이브러리이다. pip으로 설치하자.

MongoDB ID와 호환되는 모델 만들기

MongoDB는 _id 라는 프로퍼티로 collection 내의 document를 구별한다. 이것은 두 가지의 문제점을 가져온다고 한다.

  1. Pydantic 모델 내에서 프로퍼티의 이름이 언더스코어(_)로 시작할 경우 private으로 간주하기 때문에 model의 data field로 사용할 수 없다.
  2. _id는 바이트 객체로 인코딩(ObjectId) 되기 때문에 간단한 정수나 string이 아닌 임의의 복잡한 string으로 저장된다. 따라서 이런 종류의 string은 Pydantic이나 FastAPI에서 지원되지 않는다.

따라서 추가적인 boilerplate 코드를 도입해서 identifier가 정상적으로 FastAPI와 동작할 수 있게 만들어야 한다.

class MongoBaseModel(BaseModel):

    id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")

    class Config:

        json_encoders = {ObjectId: str}

idPyObjectId타입으로 선언한다.

class PyObjectId(ObjectId):
    @classmethod
    def __get_validators__(cls):
        yield cls.validate

    @classmethod
    def validate(cls, v):
        if not ObjectId.is_valid(v):
            raise ValueError("Invalid objectid")
        return ObjectId(v)

    @classmethod
    def __modify_schema__(cls, field_schema):
        field_schema.update(type="string")

이 타입은 Pydantic와 호환되도록 추가로 구현한 타입이다.

첫 번째 코드 블록에서 눈여겨 볼만한 부분은 alias이다. Pydantic에서 제공하는 옵션 중 하나인데, field의 이름을 serialization 도중에 변경할 수 있도록 한다(id_id). 이 기능 덕분에 첫 번째 문제점을 해결할 수 있다.

sub-class로 Config를 선언했기 때문에, 커스텀 타입을 같이 구현된 커스텀 함수와 함께 구현된 함수를 호출해서 직렬화한다. 이 방법으로 두 번째 이슈를 해결할 수 있다.

이제, Pydantic에서 제공하는 ~~BaseModel~~이 아닌 MongoBaseModel을 활용해서 데이터 모델을 만들면 정상적으로 작동한다.

DB에 연결하기

app.py에 motor client를 생성하고 MongoDB 서버와 연결하자.

motor_client = AsyncIOMotorClient("mongodb://localhost:27017")  # Connection to the whole server

database = motor_client["chapter6_mongo"]  # Single database instance

def get_database() -> AsyncIOMotorDatabase:

    return database

자세한 설정법은 링크 참조.

주의할 점은 여기서 지칭하는 motor_client는 진짜 데이터베이스가 아닌, MongoDB와의 연결을 뜻한다. 따라서 2번째 줄처럼 객체 인스턴스화를 진행해주어야 한다. 마지막 부분은 의존성을 구현한 부분이다. 이는 Unit Test를 쉽게 만들어준다.

Inserting Document

@app.post("/posts", response_model=PostDB, status_code=status.HTTP_201_CREATED)
async def create_post(
    post: PostCreate, database: AsyncIOMotorDatabase = Depends(get_database)
) -> PostDB:
    post_db = PostDB(**post.dict())
    await database["posts"].insert_one(post_db.dict(by_alias=True))

    post_db = await get_post_or_404(post_db.id, database)

    return post_db

의존성으로 데이터 베이스를 주입받고, postCreate 모델에 맞게 payload를 받는다.

database[”posts”]는 데이터베이스 내의 posts collection에 접근하는 부분이고, 여기서 insert_one 함수를 이용해 문서를 하나 삽입하게 된다. 여기서 by_alias=True 라는 옵션을 통해서, 우리가 임의로 변경한 _id 라는 alias name을 dictionary key로 사용할 수 있게 한다.

그 이후, 제대로 저장되었는지 DB에서 값을 받아와서 확인하는 과정을 거치면 된다.

Document 가져오기(Get)

  • 리스트로 받아오기
  • 하나의 데이터 가져오기
@app.get("/posts")
async def list_posts(
    pagination: Tuple[int, int] = Depends(pagination),
    database: AsyncIOMotorDatabase = Depends(get_database),
) -> List[PostDB]:
    skip, limit = pagination
    query = database["posts"].find({}, skip=skip, limit=limit)

    results = [PostDB(**raw_post) async for raw_post in query]

    return results

list를 받아오는 함수를 보면 collection을 가져온 이후 find 메서드를 사용한다. 이 메서드의 첫 번째 인자로는 MongoDB의 filtering 쿼리이다. 여기서는 모든 document를 원하기 때문에 빈칸으로 남겨둔다. pagenation을 위한 인자를 키워드 인자로 넘겨주었다.

이렇게 쿼리를 진행하면 MongoDB는 딕셔너리를 담고 있는 리스트를 반환하게 된다. 따라서 PostDB 객체로 변환하는 과정이 list 내에서 필요하다.

여기서 눈여겨볼 점은 query 전체를 기다리지 않았다는 점이다. 대신에 async 키워드를 list comprehension에 붙였다. 이 이유는 Motorasynchronous generator를 반환하기 때문이다.

@app.get("/posts/{id}", response_model=PostDB)
async def get_post(post: PostDB = Depends(get_post_or_404)) -> PostDB:
    return post

get_post_or_404 dependency를 재사용한다.

async def get_post_or_404(
    id: ObjectId = Depends(get_object_id),
    database: AsyncIOMotorDatabase = Depends(get_database),
) -> PostDB:
    raw_post = await database["posts"].find_one({"_id": id})

    if raw_post is None:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)

    return PostDB(**raw_post)

리스트로 받아올 때와 유사하지만, 여기서는 find_one method를 이용한다. 주의해야 되는 부분은 서두에 말했던 MongoDB의 id의 특성을 잊으면 안 된다는 점이다.

일반적인 id가 아니기 때문에, binary ID를 찾는 nested dependency를 추가로 구현해서 주입해주어야 한다.

async def get_object_id(id: str) -> ObjectId:
    try:
        return ObjectId(id)
    except (errors.InvalidId, TypeError):
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)

이로써 MongoDB의 id로 발생하는 문제를 해결하였다.

문서 업데이트 및 삭제하기

먼저 patch 엔드포인트로 선언한다.

@app.patch("/posts/{id}", response_model=PostDB)
async def update_post(
    post_update: PostPartialUpdate,
    post: PostDB = Depends(get_post_or_404),
    database: AsyncIOMotorDatabase = Depends(get_database),
) -> PostDB:
    await database["posts"].update_one(
        {"_id": post.id}, {"$set": post_update.dict(exclude_unset=True)}
    )

    post_db = await get_post_or_404(post.id, database)

    return post_db

update_one 메서드를 사용하게 된다.

  • 첫 번째 인자: filtering query
  • 두 번째 인자: 실제 문서에 적용되는 operation
    • $set: 수정하고 싶은 필드를 딕셔너리 형태로 넘겨주면 해당되는 필드가 수정된다.
@app.delete("/posts/{id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_post(
    post: PostDB = Depends(get_post_or_404),
    database: AsyncIOMotorDatabase = Depends(get_database),
):
    await database["posts"].delete_one({"_id": post.id})

DELETE 엔드포인트는 더욱 단순하다. delete_one 메서드를 이용하고, 인자로 필터링 쿼리를 전달하면 된다.

추가적인 MongoDB의 CRUD 관련 정보는 링크 참조

Nesting Documents

이번 챕터 앞부분에서 언급이 있었듯이, 문서 지향적 데이터베이스는 하나의 Entity와 관련된 정보를 하나의 문서에 담으려고 하는 의도가 있다고 설명했었다. 이런 방식으로 구현하는 법을 알아본다.

class PostDB(PostBase):
    comments: List[CommentDB] = Field(default_factory=list)

아래의 코드는 새로운 댓글을 저장하는 엔드포인트.

@app.post(
    "/posts/{id}/comments", response_model=PostDB, status_code=status.HTTP_201_CREATED
)
async def create_comment(
    comment: CommentCreate,
    post: PostDB = Depends(get_post_or_404),
    database: AsyncIOMotorDatabase = Depends(get_database),
) -> PostDB:
    await database["posts"].update_one(
        {"_id": post.id}, {"$push": {"comments": comment.dict()}}
    )

    post_db = await get_post_or_404(post.id, database)

    return post_db

관계형 데이터베이스에서 댓글을 생성할 때는 comment 자체가 제일 큰 관심 대상이었지만, 문서 지향적 DB에서는 post가 제일 큰 관심의 대상이다. 하나의 post 내에 여러 댓글 문서가 중첩되는 형식으로 구성된다.

push operation은 list 타입인 attribute에 새로운 원소를 추가할 때 유용하다. 물론 삭제하는 operation도 존재한다.(링크)

반응형