Algunos proyectos requieren de ejecutar tareas o porciones de código en segundo plano, esto no solo es necesario para procesos pesados, si no que en algunos casos se puede dar que dependan de servicios de terceros que tienen poca disponibilidad, por lo tanto el cliente no puede quedarse esperando a una respuesta por que no sería para nada agradable estar en un interfaz que no termina de cargar, para atacar este inconveniente al cliente se le da una respuesta parcial inmediata mientras el proceso se ejecuta en segundo plano o en background un ejemplo pensando en la arquitectura de celery se aprecia en la Figura 1
Cómo funciona
En la figura anterior pudimos apreciar algunos componentes que hacen parte de la arquitectura de integración de la herramienta celery, estos componentes los explicare a continuación
API
Este componente no es necesario dentro de la arquitectura de celery, en este caso funciona para permitirle al cliente enviar información y apartir de aqui este componente ejecutara las tareas con los parámetros necesarios
Task
Estas tareas son porciones de código, generalmente son métodos que se llaman y reciben unos parámetros, estos parámetros y el nombre de la tarea son serializados y enviados a una cola donde son puestos en espera a ser ejecutadas
Brokers
Este componente será el encargado de comunicar qué tareas han sido agregadas a espera de ejecución para que el componente worker se encargue de ejecutarlas algunas herramientas usadas aquí pueden ser Redis, RabbitMQ, Amazon SQS
Workers
Este componente puede considerarse una instancia en ejecución de tu proyecto o parte del proyecto, en el cual se encuentran todas las dependencias instaladas, prácticamente es una app dentro de un entorno ejecutándose
Database
El componente que se encarga de guardar los resultados de ejecutar estas tareas, en algunos casos no es necesario ya que por medio de otros mecanismos se puede llegar a notificar el estado de una tarea, las herramientas usadas puede ser un gestor de base de datos como MYSQL o PostgreSQL, también se puede utilizar Redis en la documentación de celery se menciona redis como los mas usados
los componentes mencionados anteriormente podrían ser lo más importantes a la hora implementar una solución con celery, la idea con este artículo es introducir al funcionamiento para seguir desarrollando un proyecto en donde se logre integrar con diferentes herramientas como AWS SES, S3, RabbitMQ y Redis una arquitectura similar a la que se aprecia en la Figura 2
Ejemplo introductorio
Para empezar a entender un poco mejor pasemos al código, siguiendo la linea de usar poetry instalare las siguientes librerías
instalaremos celery usando el siguiente comando
poetry add celery
para crear la API que recibirá las peticiones del cliente utilizaremos FastAPI
poetry add fastapi
e instalo uvicorn para ejecutar la aplicación de FastAPI
poetry add uvicorn[standard]
para leer el contenido del excel utilizare openpyxl
poetry add openpyxl
finalmente para usar redis como almacenamiento de el resultado de la tarea se debe instalar el controlador
poetry add redis
adicionalmente ejecutaré con docker una instancia de RabbitMQ y otra de Redis
docker run -d -p 5672:5672 --name rabbit-broker rabbitmq:3
docker run --name redis-back -p 6379:6379 -d redis
con las herramientas instaladas creare un archivo donde estará la configuración de celery yo lo llamare celery_config.py
from celery import Celery
app = Celery(
'myceleryapp',
broker='amqp://guest@localhost//',
backend='redis://localhost',
include=["tasks.tasks"]
)
creare un folder llamado tasks que contendrá las tareas que ejecutará, en este ejemplo solo ejecutare una tarea como se muestra en el siguiente código en un archivo llamado tasks.py
from celery_config import app
from celery.utils.log import get_logger
from openpyxl import load_workbook
celery_log = get_logger(__name__)
@app.task(name="show_info_excel")
def show_info_excel(location: str):
workbook_with_data = load_workbook(filename=location, data_only=True)
first_sheet = workbook_with_data.sheetnames[0]
sheet_range = workbook_with_data[first_sheet]
for row_cell in sheet_range.iter_rows(min_row=2):
user_info = " "
for cell in row_cell:
user_info = user_info + str(cell.value) + " "
celery_log.info(f"Info {user_info}")
return "OK"
En el código anterior se importa la variable app del archivo de configuración y con ella usando el decorator @app se declaran las tareas aquí se declara una tarea llamada show_info_excel que abre un excel utilizando openpyxl y recurre las celdas imprimiendo la información de cada campo.
Ahora para recibir ese excel como una api se usa FastAPI con el siguiente codigo en una rchivo llamado main.py
from fastapi import FastAPI, File
from tasks.tasks import show_info_excel
app = FastAPI()
@app.post("/file/")
def create_file(file: bytes = File(...)):
location_users_file = "users.xlsx"
with open(location_users_file, "wb+") as users_file:
users_file.write(file)
show_info_excel.delay(location_users_file)
return {"ok"}
en el código anterior se instancia un método que recibirá un archivo como parámetro este archivo será escrito en disco, y lo interesante es que se llama la tarea declarada anteriormente y utilizando el método delay se ejecutará la tarea y se le pasaran los parámetros, al hacer esto como se vio en la Figura 1 se serializa la tarea a ejecutar y sus parámetros estos se envían al broker el cual el worker consultara y ejecutará
para ejecutar el componente worker se utilizara el siguiente comando
poetry run celery -A celery_config worker --loglevel=INFO
Esto ejecutara un worker celery pasandole como parametro la variable app que se encuentra en al rchivo llamado celery_config.py y se vera algo similar a lo de la Figura 3
ahora ejecutare el componente API
poetry run uvicorn main:app --reload
y enviare el archivo como se muestra en las Figura 4 y Figura 5
al enviar el archivo se escribe en disco y se ejecuta la tarea mostrando la siguiente salida en la consola de ejecución del worker
La respuesta en el API es inmediata, sin embargo en background se lee la información y posteriormente se imprime, con esto permite al usuario obtener una respuesta en milésimas de segundo independientemente de que tan pesado sea ejecutar esta porcion de codigo
Para finalizar dejo una imagen de la estructura del proyecto utilizado
Conclusión
Muchas tareas a lo largo de un proyecto requerirán bien sea de procesamientos pesados, procesamiento que depende de otros servicios e incluso tareas periódicas que se ejecutan en unas fechas dadas, celery ayuda con una configuración no muy compleja a lograr enfrentar estos casos, por lo tanto en los próximos artículos seguiremos hablando de estas herramientas y cómo implementar casos más reales.