Skip to content

Python Flask ve Celery ile Çalışmak

Bu yazımızda pratiğe dayalı olarak python flask ile celery entegrasyonunu yapmaya çalışacağız.

Aşağıdaki komutlar ile projemizi oluşturalım.

$ mkdir flask-celery
$ cd flask-celery
$ touch main.py
# environment variables icin .env olustur
$ touch .env
# sanal ortamın kurulması
$ python3.11 -m venv venv
$ source venv/bin/activate
# kütüphanelerin kurulması
(venv) $ pip install flask celery redis python-dotenv
- main.py dosyasını açalım ve aşağıdaki kodları ekleyelim
import time
from pathlib import Path
from flask import Flask
from dotenv import load_dotenv
from celery import Celery


# load .env context (ensure that you have .env file in flask-celery/)
load_dotenv()
# BASE_DIR = flask-celery/
BASE_DIR = Path(__file__).parent


app = Flask(__name__)

@app.route("/", methods=["GET"])
def index():
    return {"message": "index works!"}
- .env dosyasını aşağıdaki gibi dolduralım
FLASK_APP=main.py
FLASK_DEBUG=True
- Şimdi flask uygulamamızı çalıştıralım(flask run) ve tarayıcıdan http://127.0.0.1:5000/ adresine gidelim
(venv) $ flask run

# output ####
* Serving Flask app 'main.py'
 * Debug mode: on
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
 * Running on http://127.0.0.1:5000
Press CTRL+C to quit
 * Restarting with stat
 * Debugger is active!
 * Debugger PIN: 241-755-834
- Aşağıdaki mesajı görmeniz beklenir

{
  "message": "index works!"
}

Celery Konfigürasyonu

main.py dosyasını açalım aşağıdaki kodları ekleyelim.

# önceki importlar ve kodlar

app = Flask(__name__)

########### Celery #################
REDIS_HOST = 'localhost'
REDIS_PORT = '6379'
REDIS_DB_NUM = '0'
CELERY_BROKER_URL = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB_NUM}'
CELERY_RESULT_BACKEND = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB_NUM}'
PARAMS_CELERY = {
    "broker":CELERY_BROKER_URL,
    "backend":CELERY_RESULT_BACKEND,
}
celery = Celery(__name__, **PARAMS_CELERY)


@app.route("/", methods=["GET"])
def index():
    return {"message": "index works!"}


# celery task tanimla
@celery.task
def my_celery_task(*args, **kwargs):
    return f"my_celery_task works!.. args:{args} , kwargs:{kwargs}"
- Redisi Docker ile çalıştırmak için aşağıdaki komutu kullanabilirsiniz
$ docker run --name redis-flask-celery -p 6379:6379 redis:latest
- Üstteki komut ile redis'in son sürümündeki imajı kullanarak 6379 portunda redis-flask-celery isminde bir container ayağa kaldırıyoruz.

Celery worker çalıştırmak için aşağıdaki komutu yeni bir terminalde yazalım.(venv aktif etmeyi unutmayın!)

(venv) $ celery --app=main.celery worker --loglevel=info
- Üstteki komut ile celery'yi worker olarak çalıştırıyoruz ve main.py içerisinde bulunan celery instance'ı kullanacağımızı ve info seviyesinde logları görmek istediğimizi belirtiyoruz.

  • Böylece hem message broker, hem result backend, hem de worker çalışmaya başlıyor.

  • Şimdi my_celery_task olarak tanımladığımız celery task fonksiyonunu çağıralım. Yeni bir terminal açarak flask shell komutunu çalıştıralım.

    (venv) $ flask shell
    Python 3.11.0 (v3.11.0:deaf509e8f, Oct 24 2022, 14:43:23) [Clang 13.0.0 (clang-1300.0.29.30)] on darwin
    App: main
    Instance: /Users/dev/webdev/pytrinfo-projects/flask-celery/instance
    >>> 
    >>> from main import my_celery_task
    >>> # taski calistirirken .delay() ile cagirmamiz gerekiyor
    >>> my_celery_task.delay()
    <AsyncResult: 82968f62-a007-4bac-8ca0-c6dfb10d3060>
    >>> 
    

  • my_celery_task import ettikten sonra .delay( ) ile celery task fonksiyonunu çağırıyoruz. AsyncResult objesinin return edildiğini görmektesiniz.

Şimdi celery çalıştırdığınız terminale geçerek çıktıları kontrol edin. Aşağıdaki gibi bir sonuç görmeniz beklenir.

[2023-08-04 22:48:59,887: INFO/MainProcess] mingle: searching for neighbors
[2023-08-04 22:49:00,904: INFO/MainProcess] mingle: all alone
[2023-08-04 22:49:00,933: INFO/MainProcess] celery@developers-MacBook-Pro.local ready.
[2023-08-04 22:49:57,727: INFO/MainProcess] Task main.my_celery_task[82968f62-a007-4bac-8ca0-c6dfb10d3060] received
[2023-08-04 22:49:57,736: INFO/ForkPoolWorker-8] Task main.my_celery_task[82968f62-a007-4bac-8ca0-c6dfb10d3060] succeeded in 0.00804120791144669s: 'my_celery_task works!.. args:() , kwargs:{}'
- Şimdi my_celery_task taskına parametre vererek tekrar çağıralım
>>> my_celery_task.delay()
<AsyncResult: 82968f62-a007-4bac-8ca0-c6dfb10d3060>
>>>
>>> my_celery_task.delay("test", name="Adnan Kaya")
<AsyncResult: 74b94ec7-836e-436f-a4b3-f44fff2c2949>
>>> 
- Celery terminalden sonucu görelim
[2023-08-04 22:48:59,887: INFO/MainProcess] mingle: searching for neighbors
[2023-08-04 22:49:00,904: INFO/MainProcess] mingle: all alone
[2023-08-04 22:49:00,933: INFO/MainProcess] celery@developers-MacBook-Pro.local ready.
[2023-08-04 22:49:57,727: INFO/MainProcess] Task main.my_celery_task[82968f62-a007-4bac-8ca0-c6dfb10d3060] received
[2023-08-04 22:49:57,736: INFO/ForkPoolWorker-8] Task main.my_celery_task[82968f62-a007-4bac-8ca0-c6dfb10d3060] succeeded in 0.00804120791144669s: 'my_celery_task works!.. args:() , kwargs:{}'
[2023-08-04 22:55:21,917: INFO/MainProcess] Task main.my_celery_task[74b94ec7-836e-436f-a4b3-f44fff2c2949] received
[2023-08-04 22:55:21,922: INFO/ForkPoolWorker-8] Task main.my_celery_task[74b94ec7-836e-436f-a4b3-f44fff2c2949] succeeded in 0.0032572918571531773s: 'my_celery_task works!.. args:(\'test\',) , kwargs:{\'name\': \'Adnan Kaya\'}'
- Şimdi main.py dosyasına uzun süren işlemleri temsil edecek birkaç ekleme yapalım.

# önceki kodlar

def long_running_computing():
    time.sleep(10)
    return "3.14159141591415914159141591415914159"


@app.route("/compute", methods=["GET"])
def compute():
    result = "Nothing computed"
    result = long_running_computing()
    return {"message": f"computing -> {result}"}
- Yukarıdaki long_running_computing fonksiyonu 10 saniye bekledikten sonra "3.14159..." string değerini dönmektedir.

Tarayıcınızdan http://127.0.0.1:5000/compute adresine gidin. 10 saniye bekledikten sonra aşağıdaki sonucu göreceksiniz:

{
  "message": "computing -> 3.14159141591415914159141591415914159"
}
- Şimdi web uygulamanızda ziyaretçinin bu sayfaya geldiğini veya bir şekilde burayı tetiklediğini düşünün 10 saniye gibi uzun bir süre bekleyecektir. Benzer bir senaryo olarak bu sayfada email gönderildiğini veya harici bir API'dan veri alındığını veya uzun hesaplamaların yapıldığını ve kullanıcının sayfanın yüklenmesini beklemek zorunda kaldığını düşünün. Kullanıcı ilk olarak sayfada bir problem olup olmadığını düşünecektir. Tekrar sayfayı yenileyebilir. Tekrar tekrar butona basabilir. Bu istenmeyen bir durumdur. Ayrıca diğer kullanıcılar da sürekli beklemek zorunda kalacaktır.

Herhangi bir hesaplama yapılıyorsa en azından kullanıcı bilgilendirilmelidir ve başka bir yerde sonuç gösterilmelidir.

Senaryomuzu şöyle değiştirelim. Hesaplama işlemi arka planda yapılsın. Kullanıcı istek attığında direkt cevap olarak kendisine bir id verelim. Bu id ile task çalışma durumunu ve sonucu görüntüleyebilsin.

main.py dosyasını açalım ve aşağıdaki ekleme ve düzenlemeleri yapalım.

# önceki kodlar
def long_running_computing():
    time.sleep(10)
    return "3.14159141591415914159141591415914159"

# new
@celery.task
def my_fast_task(*args, **kwargs): 
    return long_running_computing()

@app.route("/compute", methods=["GET"])
def compute():
    result_id = my_fast_task.delay()# edited
    return {"message": f"computing -> id -> {result_id}"} # edited
- my_fast_task adında bir celery task fonksiyon tanımladık ve compute fonksiyonunda bunu çağırdık. Kullanıcıya da task id'sini result_id cevap olarak döndük. Kullanıcı compute sayfasını ziyaret ettiğinde aşağıdaki gibi bir sonuç görecektir.

{
  "message": "computing -> id -> 2c8431e5-db37-4d44-9548-982e8ebdbd6f"
}
- Şimdi bu task id ile kullanıcının hesaplama sonucunu ve durumu görebileceği sonuç sayfası için ekleme ve düzenleme yapalım. main.py

# new
@app.route("/compute/<string:id>", methods=["GET"])
def compute_result(id): 
    async_res = my_fast_task.AsyncResult(id)
    result = "None"
    if async_res.state == "SUCCESS":
        result = async_res.get()
    return {"message": f"({async_res.state}) computing -> {result}"}
- Kullanıcı aldığı id ile http://127.0.0.1:5000/compute/2c8431e5-db37-4d44-9548-982e8ebdbd6f adresine giderse 10 saniye sonunda aşağıdaki gibi bir sonuç görür

{
  "message": "(SUCCESS) computing -> 3.14159141591415914159141591415914159"
}
- Eğer task çalışmasını bitirmeden yani 10 saniye dolmadan ziyaret ederse aşağıdaki gibi bir sonuç görür

{
  "message": "(PENDING) computing -> None"
}