Algunos proyectos requieren de ejecutar tareas o porciones de código en segundo plano, esto no solo es necesario para procesos pesados, si no que en algunos casos se puede dar que dependan de servicios de terceros que tienen poca disponibilidad, por lo tanto el cliente no puede quedarse esperando a una respuesta por que no sería para nada agradable estar en un interfaz que no termina de cargar, para atacar este inconveniente al cliente se le da una respuesta parcial inmediata mientras el proceso se ejecuta en segundo plano o en background un ejemplo pensando en la arquitectura de celery se aprecia en la Figura 1

Figura 1: Arquitectura de funcionamiento de celery

Cómo funciona

En la figura anterior pudimos apreciar algunos componentes que hacen parte de la arquitectura de integración de la herramienta celery, estos componentes los explicare a continuación

API

Este componente no es necesario dentro de la arquitectura de celery, en este caso funciona para permitirle al cliente enviar información y apartir de aqui este componente ejecutara las tareas con los parámetros necesarios

Task

Estas tareas son porciones de código, generalmente son métodos que se llaman y reciben unos parámetros, estos parámetros y el nombre de la tarea son serializados y enviados a una cola donde son puestos en espera a ser ejecutadas

Brokers

Este componente será el encargado de comunicar qué tareas han sido agregadas a espera de ejecución para que el componente worker se encargue de ejecutarlas algunas herramientas usadas aquí pueden ser Redis, RabbitMQ, Amazon SQS

Workers

Este componente puede considerarse una instancia en ejecución de tu proyecto o parte del proyecto, en el cual se encuentran todas las dependencias instaladas, prácticamente es una app dentro de un entorno ejecutándose

Database

El componente que se encarga de guardar los resultados de ejecutar estas tareas, en algunos casos no es necesario ya que por medio de otros mecanismos se puede llegar a notificar el estado de una tarea, las herramientas usadas puede ser un gestor de base de datos como MYSQL o PostgreSQL, también se puede utilizar Redis en la documentación de celery se menciona redis como los mas usados

los componentes mencionados anteriormente podrían ser lo más importantes a la hora implementar una solución con celery, la idea con este artículo es introducir al funcionamiento para seguir desarrollando un proyecto en donde se logre integrar con diferentes herramientas como AWS SES, S3, RabbitMQ y  Redis una arquitectura similar a la que se aprecia en la Figura 2

Figura 2: Arquitectura propuesta de proyecto futuro

Ejemplo introductorio

Para empezar a entender un poco mejor pasemos al código, siguiendo la linea de usar poetry instalare las siguientes librerías

instalaremos celery usando el siguiente comando

poetry add celery

para crear la API que recibirá las peticiones del cliente utilizaremos FastAPI

poetry add fastapi

e instalo uvicorn para ejecutar la aplicación de FastAPI

poetry add uvicorn[standard]

para leer el contenido del excel utilizare openpyxl

poetry add openpyxl

finalmente para usar redis como almacenamiento de el resultado de la tarea se debe instalar el controlador

poetry add redis

adicionalmente ejecutaré con docker una instancia de RabbitMQ y otra de Redis

docker run -d -p 5672:5672  --name rabbit-broker rabbitmq:3
docker run --name redis-back -p 6379:6379 -d redis

con las herramientas instaladas creare un archivo donde estará la configuración de celery yo lo llamare celery_config.py

from celery import Celery

app = Celery(
     'myceleryapp',
    broker='amqp://guest@localhost//',
    backend='redis://localhost',
    include=["tasks.tasks"]

)

creare un folder llamado tasks que contendrá las tareas que ejecutará, en este ejemplo solo ejecutare una tarea como se muestra en el siguiente código en un archivo llamado tasks.py

from celery_config import app
from celery.utils.log import get_logger
from openpyxl import load_workbook

celery_log = get_logger(__name__)


@app.task(name="show_info_excel")
def show_info_excel(location: str):
    workbook_with_data = load_workbook(filename=location, data_only=True)
    first_sheet = workbook_with_data.sheetnames[0]
    sheet_range = workbook_with_data[first_sheet]
    for row_cell in sheet_range.iter_rows(min_row=2):
        user_info = " "
        for cell in row_cell:
            user_info = user_info + str(cell.value) + " "
        celery_log.info(f"Info {user_info}")
    return "OK"

En el código anterior se importa la variable app del archivo de configuración y con ella usando el decorator @app se declaran las tareas aquí se declara una tarea llamada show_info_excel que abre un excel utilizando openpyxl y recurre las celdas imprimiendo la información de cada campo.

Ahora para recibir ese excel como una api se usa FastAPI con el siguiente codigo en una rchivo llamado main.py

from fastapi import FastAPI, File
from tasks.tasks import show_info_excel

app = FastAPI()


@app.post("/file/")
def create_file(file: bytes = File(...)):
    location_users_file = "users.xlsx"
    with open(location_users_file, "wb+") as users_file:
        users_file.write(file)
    show_info_excel.delay(location_users_file)
    return {"ok"}

en el código anterior se instancia un método que recibirá un archivo como parámetro este archivo será escrito en disco, y lo interesante es que se llama la tarea declarada anteriormente y utilizando el método delay se ejecutará la tarea y se le pasaran los parámetros, al hacer esto como se vio en la Figura 1 se serializa la tarea a ejecutar y sus parámetros estos se  envían al broker el cual el worker consultara y  ejecutará

para ejecutar el componente worker se utilizara el siguiente comando

poetry run celery -A celery_config worker --loglevel=INFO

Esto ejecutara un worker celery pasandole como parametro la variable app que se encuentra en al rchivo llamado celery_config.py y se vera algo similar a lo de la Figura 3

Figura 3: Salida por consola al ejecutar el el worker de celery

ahora ejecutare el componente API

poetry run uvicorn main:app --reload

y enviare el archivo  como se muestra en las Figura 4 y Figura 5

Figura 4: Archivo de excel con información de usuarios
Figura 5: Archivo de excel enviado a la API

al enviar el archivo se escribe en disco y se ejecuta la tarea mostrando la siguiente salida en la consola de ejecución del worker

Figura 6: Salida por consola al ejecutar la tarea de manera exitosa

La respuesta en el API es inmediata, sin embargo en background se lee la información y posteriormente se imprime, con esto permite al usuario obtener una respuesta en milésimas de segundo independientemente de que tan pesado sea ejecutar esta porcion de codigo

Para finalizar dejo una imagen de la estructura del proyecto utilizado

Figura 7: Estructura final del proyecto

Conclusión

Muchas tareas a lo largo de un proyecto requerirán bien sea de procesamientos pesados, procesamiento que depende de otros servicios e incluso tareas periódicas que se ejecutan en unas fechas dadas, celery ayuda con una configuración no muy compleja a lograr enfrentar estos casos, por lo tanto en los próximos artículos seguiremos hablando de estas herramientas y cómo implementar casos más reales.