[Python] Celery로 비동기 실시간 데이터 적재 및 분석 처리

2022. 9. 9. 00:16TIL💡/Database

데이터 분석을 위해 데이터를 적재한 뒤, 스케줄러를 돌리며 주기적으로 데이터를 분석하는 시스템을 실시간(Realtime) 처리 시스템으로 바꾸는 일에 투입되었다.

 

이를 위해 Celery라는 비동기 분산 큐 처리 라이브러리를 사용해보는 일에 도전했다. 이는 해야하는 일(task)이 즉시 이루어지지 않는 경우에 유리한 라이브러리이다. 예를 들어 문자 메시지를 전송하는 일이나 다수의 태스크가 동시 다발적으로 발생할 경우 작업이 누락되거나 중복되어 수행될 수 있기 때문에 주의 깊은 처리가 필요하다. 이러한 태스크를 위해 대기 중인 작업들을 관리하고, Worker에 작업을 제대로 전달하도록 Broker를 활용한다.

Broker는 기본적으로 Queue라는 자료구조를 사용해 task를 보관한 뒤, Worker에 작업을 배분한다.

작업이 끝난 결과는 result backend에 저장된다. 브로커에 쓰이는 데이터베이스는 종류가 다양하나, 편의를 위해 기존의 기술 스택이었던 Redis를 사용하였다.

 

http://blog.adnansiddiqi.me/getting-started-with-celery-and-python/

 

요구사항 중에는 단순히 결과가 데이터베이스에 적재되는 것뿐만 아니라 적재되는 경우 다른 프로세스에서 이를 알아채고 결과를 확인할 수 있어야 한다고 했다. 이를 위해서 Celery에 내재된 이벤트 리스너를 사용하였다. 우선 PoC 수준의 작업이었기에 이벤트가 발생하는 경우를 확인할 수 있는 수준으로 개발하였다.

https://github.com/dleunji/celery-redis-queue

 

GitHub - dleunji/celery-redis-queue: Celery and Redis Queue in FastAPI

Celery and Redis Queue in FastAPI. Contribute to dleunji/celery-redis-queue development by creating an account on GitHub.

github.com

더 고급지게 Queue를 쓴다면 교착상태를 고려해야 한다. 만약 이 점을 고려해야 한다면 아래의 스포카 기술 블로그를 참고하면 좋을 것 같다. 그리고 만약 해당 기술을 쭉 발전시키게 된다면 더욱 robust한 비동기 처리 시스템을 만들어보고 싶다.

 

Reference

- https://testdriven.io/courses/fastapi-celery/getting-started/

- https://velog.io/@anjaekk/Python-Celery

- 스포카 기술 블로그 - Celery를 이용한 긴 작업 처리