개발/오늘 배운 지식

[FastAPI/Python] FastAPI와 데이터 베이스

Woogie2 2022. 8. 8. 22:05
반응형

Chapter 6: 데이터 베이스와 비동기 ORMs

Building Data Science Applications with FastAPI”라는 책을 읽고 정리한 내용입니다.

REST API를 만드는 주된 이유: 데이터를 읽고 쓰기 위함.

하지만, 아직 데이터베이스에 대해서는 책에서 다루지 않았음.

  • ORM: Object-Relational Mapping

Relational vs NoSQL 데이터베이스

  • 데이터베이스는 체계적으로 데이터를 저장하고, 데이터의 무결성을 유지하며, query language를 통해 필요할 때 데이터를 찾을 수 있도록 해줘야 한다.
  • Relational Database(관계형 데이터베이스): SQL query language
  • NoSQL: Relational DB의 반대 케이스
  • 필요에 따라 적절한 기술 선택하는 것이 필요하다.

관계형 DB

  • SQL 쿼리 언어와 함께 사용
  • relational model을 구현한다
    • entity or object of application is stored in tables
    • 모든 것이 table 형태로 저장
    • 각 table의 column은 entity의 attribute를 나타냄
    • table의 row는 하나의 entity를 뜻한다.
    • 각 table들은 서로 relationship을 가진다.
      • 관계를 만드는 주된 동기는 avoid duplication
  • 보통, 관계형 DB의 각 row는 primary key라는 identifier를 가진다.
    • unique in the table → uniquely identify this row
    • 다른 table이 이 row를 찾는 데 사용한다면 foreign key가 되는 것임
      • 즉, 다른 table의 primary key를 현재 table에 저장하고 있는 것.
  • Schema가 너무 복잡해지면, foreign key로 찾아서 join 하는 것이 오히려 더 비효율 적일 수 있다.

NoSQL DB

  • Not relational DB ⇒ fallback to NoSQL
    • Key-Value stores: Redis
    • Graph DB: Neo4j
    • Document-oriented DB: MongoDB
  • 현재 이 책에서 NoSQL을 지칭할 때는 위의 세 가지 중 마지막인 Document-oriented DB를 지칭함.
  • 문서 지향 DB는 하나의 문서 내에 모든 정보를 저장한다는 컨셉이다
    • 따라서, join query를 더 적게 수행하고, join이 어려움.
  • documents들은 collections에 저장된다.
    • 각 문서는 서로 다른 attribute를 가지고 있을 수 있다(일관성이 보장되지 않음)
  • 예시: 게시물에 달리는 댓글을 저장할 때, 댓글 컬렉션을 따로 구별하지 않고, 게시물 document 내에 comments list를 저장하는 것.
    • 거대한 스케일, 덜 구조적인 데이터를 저장할 때, 적합하다.
      • Social networks

어떤 것을 선택할까?

  • 관계형 DB
    • 구조적으로 데이터 저장 가능
    • 데이터가 일관성을 유지할 수 있음
    • 하지만, 스키마를 정의할 때, 세심하게 진행해야 함.
    • schema 수정이 힘들다
  • Document-oriented DB
    • 스키마를 정의할 필요가 없다.
    • 아직 프로젝트가 성숙하지 않았을 때, 유연한 변경이 가능
    • 데이터 일관성 유지가 힘들다.
  • 작거나 중간 정도의 앱에서는 두 가지 모두 좋은 성능을 보여준다.

관계형 DB를 사용하는 경우

  • SQLAlchemy 사용
  • Encode를 활용해서 비동기 통신 제어
    • 여기서는 sqlite를 사용
    • production level에서는 mysql, postgre-sql 등을 사용 가능하다

먼저, 라이브러리를 가상 환경에 설치하자.

pip install databases[sqlite]

테이블 스키마 생성하기

metadata = sqlalchemy.MetaData()

posts = sqlalchemy.Table(

    "posts",

    metadata,

    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True, autoincrement=True),

    sqlalchemy.Column("publication_date", sqlalchemy.DateTime(), nullable=False),

    sqlalchemy.Column("title", sqlalchemy.String(length=255), nullable=False),

    sqlalchemy.Column("content", sqlalchemy.Text(), nullable=False),

)

먼저 메타데이터 객체를 생성한다. DB 자체의 정보를 담고 있다. 전체 프로젝트에서 딱 한번 생성 후, 같은 객체에 접근하게 된다.

sqlalchemy의 Table 클래스를 이용하여 Table을 생성하고, Column 클래스를 이용하여 entity의 attribute를 생성할 수 있다.

데이터베이스 연결하기

DATABASE_URL = "sqlite:///chapter6_sqlalchemy.db"

database = Database(DATABASE_URL)

sqlalchemy_engine = sqlalchemy.create_engine(DATABASE_URL)

여기서 DATABASE_URLpostgresql+pg8000://dbuser:kx%25jj5%2Fg**@pghost10**/appdb 처럼 데이터베이스 엔진, auth 정보와 호스트 이름 등이 적혀있다. sqlite는 db file의 path만 간단히 입력해두면 된다.

def get_database() -> Database:

    return database

위의 코드는 DB 인스턴스를 받아오는 Dependency를 구현한 것이다. 이렇게 구현하는 것의 장점은 Unit Test가 쉬워지기 때문이다.

@app.on_event("startup")
async def startup():

    await database.connect()

    metadata.create_all(sqlalchemy_engine)

@app.on_event("shutdown")
async def shutdown():

    await database.disconnect()

그리고 FastAPI는 startup과 shutdown 이벤트를 위한 데코레이터를 제공한다.

Insert 쿼리 만들기

@app.post("/posts", response_model=PostDB, status_code=status.HTTP_201_CREATED)
async def create_post(

    post: PostCreate, database: Database = Depends(get_database)

) -> PostDB:

    insert_query = posts.insert().values(post.dict())

    post_id = await database.execute(insert_query)

    post_db = await get_post_or_404(post_id, database)

    return post_db
  • SQLAlchemy에서 제공하는 메서드로 쿼리를 만들면, DB를 다른 엔진으로 바꿔도 코드 변경 없이 사용할 수 있다
  • 그리고 쿼리가 posts라는 Table instance를 활용하였기 때문에 SQLAlchemy가 바로 어떤 테이블에 쿼리를 만들었는지 알 수 있다
  • post.dict()를 통해 편리하게 값들을 설정할 수 있다. 이렇게 하기 위해서는 Pydantic Model과 Table schema가 통일성 있게 유지되어야 한다.
  • await 구문을 통해서 비동기적으로 실행된다.
  • 마지막으로 get_post_or_404로 디비에 올바르게 저장되었는지 확인한다.

Select 쿼리

가장 기본이 되는 select 쿼리문을 만들어보자

하나의 객체만 가져오거나, 리스트 객체를 받아온다.

@app.get("/posts")
async def list_posts(

    pagination: Tuple[int, int] = Depends(pagination),

    database: Database = Depends(get_database),

) -> List[PostDB]:

    skip, limit = pagination

    select_query = posts.select().offset(skip).limit(limit)

    rows = await database.fetch_all(select_query)

    results = [PostDB(**row) for row in rows]

    return results

@app.get("/posts/{id}", response_model=PostDB)

async def get_post(post: PostDB = Depends(get_post_or_404)) -> PostDB:

    return post

get_post_or_404처럼 하나의 객체를 찾는 경우가 잦기 때문에 의존성을 구현해서 재사용성을 높이자.

async def get_post_or_404(

    id: int, database: Database = Depends(get_database)

) -> PostDB:

    select_query = posts.select().where(posts.c.id == id)

    raw_post = await database.fetch_one(select_query)

    if raw_post is None:

        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)

    return PostDB(**raw_post)

WHERE절을 이용한다면 우리가 원하는 조건의 데이터를 필터링할 수 있다. column에는 posts.c.id 로 접근할 수 있다.

Update, Delete 쿼리 만들기

@app.patch("/posts/{id}", response_model=PostDB)
async def update_post(

    post_update: PostPartialUpdate,

    post: PostDB = Depends(get_post_or_404),

    database: Database = Depends(get_database),

) -> PostDB:

    update_query = (

        posts.update()

        .where(posts.c.id == post.id)

        .values(post_update.dict(exclude_unset=True))

    )

    await database.execute(update_query)

    post_db = await get_post_or_404(post.id, database)

    return post_db

@app.delete("/posts/{id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_post(

    post: PostDB = Depends(get_post_or_404), database: Database = Depends(get_database)

):

    delete_query = posts.delete().where(posts.c.id == post.id)

    await database.execute(delete_query)

두 쿼리문 모두 특정 id를 가지는 객체에만 접근해야 하므로 WHERE 절을 활용한다. 그 외에는 거의 비슷하다.

Relationship 추가하기

comments = sqlalchemy.Table(

    "comments",

    metadata,

    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True, autoincrement=True),

    sqlalchemy.Column(

        "post_id", sqlalchemy.ForeignKey("posts.id", ondelete="CASCADE"), nullable=False

    ),

    sqlalchemy.Column("publication_date", sqlalchemy.DateTime(), nullable=False),

    sqlalchemy.Column("content", sqlalchemy.Text(), nullable=False),

)
@app.post("/comments", response_model=CommentDB, status_code=status.HTTP_201_CREATED)

async def create_comment(

    comment: CommentCreate, database: Database = Depends(get_database)

) -> CommentDB:

    select_post_query = posts.select().where(posts.c.id == comment.post_id)

    post = await database.fetch_one(select_post_query)

    if post is None:

        raise HTTPException(

            status_code=status.HTTP_400_BAD_REQUEST, detail=f"Post {id} does not exist"

        )

    insert_query = comments.insert().values(comment.dict())

    comment_id = await database.execute(insert_query)

    select_query = comments.select().where(comments.c.id == comment_id)

    raw_comment = cast(Mapping, await database.fetch_one(select_query))

    return CommentDB(**raw_comment)

하나의 쿼리에서 연관된 다른 테이블의 데이터를 가져오는 코드

async def get_post_or_404(

    id: int, database: Database = Depends(get_database)

) -> PostPublic:

    select_post_query = posts.select().where(posts.c.id == id)

    raw_post = await database.fetch_one(select_post_query)

    if raw_post is None:

        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)

    select_post_comments_query = comments.select().where(comments.c.post_id == id)

    raw_comments = await database.fetch_all(select_post_comments_query)

    comments_list = [CommentDB(**comment) for comment in raw_comments]

    return PostPublic(**raw_post, comments=comments_list)

Alembic을 활용해서 DB 마이그레이션 시스템 세팅하기

pip install alembic로 설치 후 프로젝트 루트 경로에서 alembic init alembic 명령어를 실행하자. 깃헙에 올릴 때, 함께 업로드가 되어야 한다. Alembic에 DB의 메타데이터를 제공해주면 마이그레이션 스크립트를 자동 생성해준다. 설정 변경이 필수적임.

  • DB 스키마를 업데이트하는 마이그레이션은 극히 신중하게 수행되어야 한다.
반응형