Python Flask ve Celery ile Çalışmak¶
Bu yazımızda pratiğe dayalı olarak python flask ile celery entegrasyonunu yapmaya çalışacağız.
Aşağıdaki komutlar ile projemizi oluşturalım.
$ mkdir flask-celery
$ cd flask-celery
$ touch main.py
# environment variables icin .env olustur
$ touch .env
# sanal ortamın kurulması
$ python3.11 -m venv venv
$ source venv/bin/activate
# kütüphanelerin kurulması
(venv) $ pip install flask celery redis python-dotenv
import time
from pathlib import Path
from flask import Flask
from dotenv import load_dotenv
from celery import Celery
# load .env context (ensure that you have .env file in flask-celery/)
load_dotenv()
# BASE_DIR = flask-celery/
BASE_DIR = Path(__file__).parent
app = Flask(__name__)
@app.route("/", methods=["GET"])
def index():
return {"message": "index works!"}
(venv) $ flask run
# output ####
* Serving Flask app 'main.py'
* Debug mode: on
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
* Running on http://127.0.0.1:5000
Press CTRL+C to quit
* Restarting with stat
* Debugger is active!
* Debugger PIN: 241-755-834
Celery Konfigürasyonu¶
main.py dosyasını açalım aşağıdaki kodları ekleyelim.
# önceki importlar ve kodlar
app = Flask(__name__)
########### Celery #################
REDIS_HOST = 'localhost'
REDIS_PORT = '6379'
REDIS_DB_NUM = '0'
CELERY_BROKER_URL = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB_NUM}'
CELERY_RESULT_BACKEND = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB_NUM}'
PARAMS_CELERY = {
"broker":CELERY_BROKER_URL,
"backend":CELERY_RESULT_BACKEND,
}
celery = Celery(__name__, **PARAMS_CELERY)
@app.route("/", methods=["GET"])
def index():
return {"message": "index works!"}
# celery task tanimla
@celery.task
def my_celery_task(*args, **kwargs):
return f"my_celery_task works!.. args:{args} , kwargs:{kwargs}"
Celery worker çalıştırmak için aşağıdaki komutu yeni bir terminalde yazalım.(venv aktif etmeyi unutmayın!)
- Üstteki komut ile celery'yi worker olarak çalıştırıyoruz ve main.py içerisinde bulunan celery instance'ı kullanacağımızı ve info seviyesinde logları görmek istediğimizi belirtiyoruz.-
Böylece hem message broker, hem result backend, hem de worker çalışmaya başlıyor.
-
Şimdi my_celery_task olarak tanımladığımız celery task fonksiyonunu çağıralım. Yeni bir terminal açarak flask shell komutunu çalıştıralım.
(venv) $ flask shell Python 3.11.0 (v3.11.0:deaf509e8f, Oct 24 2022, 14:43:23) [Clang 13.0.0 (clang-1300.0.29.30)] on darwin App: main Instance: /Users/dev/webdev/pytrinfo-projects/flask-celery/instance >>> >>> from main import my_celery_task >>> # taski calistirirken .delay() ile cagirmamiz gerekiyor >>> my_celery_task.delay() <AsyncResult: 82968f62-a007-4bac-8ca0-c6dfb10d3060> >>>
- my_celery_task import ettikten sonra .delay( ) ile celery task fonksiyonunu çağırıyoruz. AsyncResult objesinin return edildiğini görmektesiniz.
Şimdi celery çalıştırdığınız terminale geçerek çıktıları kontrol edin. Aşağıdaki gibi bir sonuç görmeniz beklenir.
[2023-08-04 22:48:59,887: INFO/MainProcess] mingle: searching for neighbors
[2023-08-04 22:49:00,904: INFO/MainProcess] mingle: all alone
[2023-08-04 22:49:00,933: INFO/MainProcess] [email protected] ready.
[2023-08-04 22:49:57,727: INFO/MainProcess] Task main.my_celery_task[82968f62-a007-4bac-8ca0-c6dfb10d3060] received
[2023-08-04 22:49:57,736: INFO/ForkPoolWorker-8] Task main.my_celery_task[82968f62-a007-4bac-8ca0-c6dfb10d3060] succeeded in 0.00804120791144669s: 'my_celery_task works!.. args:() , kwargs:{}'
>>> my_celery_task.delay()
<AsyncResult: 82968f62-a007-4bac-8ca0-c6dfb10d3060>
>>>
>>> my_celery_task.delay("test", name="Adnan Kaya")
<AsyncResult: 74b94ec7-836e-436f-a4b3-f44fff2c2949>
>>>
[2023-08-04 22:48:59,887: INFO/MainProcess] mingle: searching for neighbors
[2023-08-04 22:49:00,904: INFO/MainProcess] mingle: all alone
[2023-08-04 22:49:00,933: INFO/MainProcess] [email protected] ready.
[2023-08-04 22:49:57,727: INFO/MainProcess] Task main.my_celery_task[82968f62-a007-4bac-8ca0-c6dfb10d3060] received
[2023-08-04 22:49:57,736: INFO/ForkPoolWorker-8] Task main.my_celery_task[82968f62-a007-4bac-8ca0-c6dfb10d3060] succeeded in 0.00804120791144669s: 'my_celery_task works!.. args:() , kwargs:{}'
[2023-08-04 22:55:21,917: INFO/MainProcess] Task main.my_celery_task[74b94ec7-836e-436f-a4b3-f44fff2c2949] received
[2023-08-04 22:55:21,922: INFO/ForkPoolWorker-8] Task main.my_celery_task[74b94ec7-836e-436f-a4b3-f44fff2c2949] succeeded in 0.0032572918571531773s: 'my_celery_task works!.. args:(\'test\',) , kwargs:{\'name\': \'Adnan Kaya\'}'
# önceki kodlar
def long_running_computing():
time.sleep(10)
return "3.14159141591415914159141591415914159"
@app.route("/compute", methods=["GET"])
def compute():
result = "Nothing computed"
result = long_running_computing()
return {"message": f"computing -> {result}"}
Tarayıcınızdan http://127.0.0.1:5000/compute adresine gidin. 10 saniye bekledikten sonra aşağıdaki sonucu göreceksiniz:
- Şimdi web uygulamanızda ziyaretçinin bu sayfaya geldiğini veya bir şekilde burayı tetiklediğini düşünün 10 saniye gibi uzun bir süre bekleyecektir. Benzer bir senaryo olarak bu sayfada email gönderildiğini veya harici bir API'dan veri alındığını veya uzun hesaplamaların yapıldığını ve kullanıcının sayfanın yüklenmesini beklemek zorunda kaldığını düşünün. Kullanıcı ilk olarak sayfada bir problem olup olmadığını düşünecektir. Tekrar sayfayı yenileyebilir. Tekrar tekrar butona basabilir. Bu istenmeyen bir durumdur. Ayrıca diğer kullanıcılar da sürekli beklemek zorunda kalacaktır.Herhangi bir hesaplama yapılıyorsa en azından kullanıcı bilgilendirilmelidir ve başka bir yerde sonuç gösterilmelidir.
Senaryomuzu şöyle değiştirelim. Hesaplama işlemi arka planda yapılsın. Kullanıcı istek attığında direkt cevap olarak kendisine bir id verelim. Bu id ile task çalışma durumunu ve sonucu görüntüleyebilsin.
main.py dosyasını açalım ve aşağıdaki ekleme ve düzenlemeleri yapalım.
# önceki kodlar
def long_running_computing():
time.sleep(10)
return "3.14159141591415914159141591415914159"
# new
@celery.task
def my_fast_task(*args, **kwargs):
return long_running_computing()
@app.route("/compute", methods=["GET"])
def compute():
result_id = my_fast_task.delay()# edited
return {"message": f"computing -> id -> {result_id}"} # edited
# new
@app.route("/compute/<string:id>", methods=["GET"])
def compute_result(id):
async_res = my_fast_task.AsyncResult(id)
result = "None"
if async_res.state == "SUCCESS":
result = async_res.get()
return {"message": f"({async_res.state}) computing -> {result}"}