django框架请求/响应的过程是同步的,框架本身无法实现异步响应。
但是我们在项目过程中会经常会遇到一些耗时的任务, 比如:发送邮件、发送短信、大数据统计等等,这些操作耗时长,同步执行对用户体验非常不友好,那么在这种情况下就需要实现异步执行。
异步执行前端一般使用ajax,后端使用Celery。
django项目应用celery,主要有两种任务方式,一是异步任务(发布者任务),一般是web请求,二是定时任务。
celery组成
Celery是由Python开发、简单、灵活、可靠的分布式任务队列,是一个处理异步任务的框架,其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务。Celery侧重于实时操作,但对调度支持也很好,其每天可以处理数以百万计的任务。特点:
简单:熟悉celery的工作流程后,配置使用简单
高可用:当任务执行失败或执行过程中发生连接中断,celery会自动尝试重新执行任务
快速:一个单进程的celery每分钟可处理上百万个任务
灵活:几乎celery的各个组件都可以被扩展及自定制
Celery由三部分构成:
消息中间件(Broker):官方提供了很多备选方案,支持RabbitMQ、Redis、Amazon SQS、MongoDB、Memcached 等,官方推荐RabbitMQ
任务执行单元(Worker):任务执行单元,负责从消息队列中取出任务执行,它可以启动一个或者多个,也可以启动在不同的机器节点,这就是其实现分布式的核心
结果存储(Backend):官方提供了诸多的存储方式支持:RabbitMQ、 Redis、Memcached,SQLAlchemy, Django ORM、Apache Cassandra、Elasticsearch等
架构如下:
工作原理:
任务模块Task包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往消息队列,而定时任务由Celery Beat进程周期性地将任务发往消息队列;
任务执行单元Worker实时监视消息队列获取队列中的任务执行;
Woker执行完任务后将结果保存在Backend中;
本文使用的是redis数据库作为消息中间件和结果存储数据库
1.安装库
pip install celery pip install redis
2.celery.py
在主项目目录下,新建 celery.py 文件:
import os import django from celery import Celery from django.conf import settings # 设置系统环境变量,安装django,必须设置,否则在启动celery时会报错 # celery_study 是当前项目名 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_study.settings') django.setup() celery_app = Celery('celery_study') celery_app.config_from_object('django.conf:settings') celery_app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
注意:是和settings.py文件同目录,一定不能建立在项目根目录,不然会引起 celery 这个模块名的命名冲突
同时,在主项目的init.py中,添加如下代码:
from .celery import celery_app __all__ = ['celery_app']
3.settings.py
在配置文件中配置对应的redis配置:
# Broker配置,使用Redis作为消息中间件 BROKER_URL = 'redis://127.0.0.1:6379/0' # BACKEND配置,这里使用redis CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 结果序列化方案 CELERY_RESULT_SERIALIZER = 'json' # 任务结果过期时间,秒 CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 时区配置 CELERY_TIMEZONE='Asia/Shanghai' # 指定导入的任务模块,可以指定多个 #CELERY_IMPORTS = ( # 'other_dir.tasks', #)
注意:所有配置的官方文档:Configuration and defaults — Celery 5.2.0b3 documentation
4.tasks.py
在子应用下建立各自对应的任务文件tasks.py(必须是tasks.py这个名字,不允许修改)
from celery import shared_task @shared_task def add(x, y): return x + y @shared_task def mul(x, y): return x * y @shared_task def xsum(numbers): return sum(numbers)
5.调用任务
from .tasks import * # Create your views here. def task_add_view(request): add.delay(100,200) return HttpResponse(f'调用函数结果')
6.启动celery
pip install eventlet
celery -A celery_study worker -l debug -P eventlet
注意 :celery_study是项目名
使用redis时,有可能会出现如下类似的异常
AttributeError: 'str' object has no attribute 'items'
这是由于版本差异,需要卸载已经安装的python环境中的 redis 库,重新指定安装特定版本(celery4.x以下适用 redis2.10.6, celery4.3以上使用redis3.2.0以上):
xxxxxxxxxx pip install redis==2.10.6
7.获取任务结果
在 views.py 中,通过 AsyncResult.get() 获取结果
from celery import result def get_result_by_taskid(request): task_id = request.GET.get('task_id') # 异步执行 ar = result.AsyncResult(task_id) if ar.ready(): return JsonResponse({'status': ar.state, 'result': ar.get()}) else: return JsonResponse({'status': ar.state, 'result': ''})
AsyncResult类的常用的属性和方法:
在第一步的异步任务的基础上,进行部分修改即可
1.settings.py
from celery.schedules import crontab CELERYBEAT_SCHEDULE = { 'mul_every_30_seconds': { # 任务路径 'task': 'celery_app.tasks.mul', # 每30秒执行一次 'schedule': 5, 'args': (14, 5) } }
说明(更多内容见文档:Periodic Tasks — Celery 5.2.0b3 documentation):
在task.py中设置了日志
from celery import shared_task import logging logger = logging.getLogger(__name__)) @shared_task def mul(x, y): logger.info('___mul__'*10) return x * y
2.启动celery
(两个cmd)分别启动worker和beat
celery -A worker celery_study -l debug -P eventlet celery beat -A celery_study -l debug
Celery可通过task绑定到实例获取到task的上下文,这样我们可以在task运行时候获取到task的状态,记录相关日志等
方法:
在task.py 里面写
from celery import shared_task import logging logger = logging.getLogger(__name__) # 任务绑定 @shared_task(bind=True) def add(self,x, y): logger.info('add__-----'*10) logger.info('name:',self.name) logger.info('dir(self)',dir(self)) return x + y
其中:self对象是celery.app.task.Task的实例,可以用于实现重试等多种功能
from celery import shared_task import logging logger = logging.getLogger(__name__) # 任务绑定 @shared_task(bind=True) def add(self,x, y): try: logger.info('add__-----'*10) logger.info('name:',self.name) logger.info('dir(self)',dir(self)) raise Exception except Exception as e: # 出错每4秒尝试一次,总共尝试4次 self.retry(exc=e, countdown=4, max_retries=4) return x + y
启动celery
celery -A worker celery_study -l debug -P eventlet
Celery在执行任务时,提供了钩子方法用于在任务执行完成时候进行对应的操作,在Task源码中提供了很多状态钩子函数如:on_success(成功后执行)、on_failure(失败时候执行)、on_retry(任务重试时候执行)、after_return(任务返回时候执行)
方法:通过继承Task类,重写对应方法即可,
from celery import Task class MyHookTask(Task): def on_success(self, retval, task_id, args, kwargs): logger.info(f'task id:{task_id} , arg:{args} , successful !') def on_failure(self, exc, task_id, args, kwargs, einfo): logger.info(f'task id:{task_id} , arg:{args} , failed ! erros: {exc}') def on_retry(self, exc, task_id, args, kwargs, einfo): logger.info(f'task id:{task_id} , arg:{args} , retry ! erros: {exc}') # 在对应的task函数的装饰器中,通过 base=MyHookTask 指定 @shared_task(base=MyHookTask, bind=True) def add(self,x, y): logger.info('add__-----'*10) logger.info('name:',self.name) logger.info('dir(self)',dir(self)) return x + y
启动celery
celery -A worker celery_study -l debug -P eventlet
在很多情况下,一个任务需要由多个子任务或者一个任务需要很多步骤才能完成,Celery也能实现这样的任务,完成这类型的任务通过以下模块完成:
文档:Next Steps — Celery 5.2.0b3 documentation
1.group
urls.py:
path('primitive/', views.test_primitive),
views.py:
from .tasks import * from celery import group def test_primitive(request): # 创建10个并列的任务 lazy_group = group(add.s(i, i) for i in range(10)) promise = lazy_group() result = promise.get() return JsonResponse({'function': 'test_primitive', 'result': result})
说明:
通过task函数的 s 方法传入参数,启动任务
上面这种方法需要进行等待,如果依然想实现异步的方式,那么就必须在tasks.py中新建一个task方法,调用group,示例如下:
tasks.py:
@shared_task def group_task(num): return group(add.s(i, i) for i in range(num))().get()
urls.py:
path('first_group/', views.first_group),
views.py:
def first_group(request): ar = tasks.group_task.delay(10) return HttpResponse('返回first_group任务,task_id:' + ar.task_id)
2.chain
默认上一个任务的结果作为下一个任务的第一个参数
def test_primitive(request): # 等同调用 mul(add(add(2, 2), 5), 8) promise = chain(tasks.add.s(2, 2), tasks.add.s(5), tasks.mul.s(8))() # 72 result = promise.get() return JsonResponse({'function': 'test_primitive', 'result': result})
3.chord
任务分割,分为header和body两部分,hearder任务执行完在执行body,其中hearder返回结果作为参数传递给body
def test_primitive(request): # header: [3, 12] # body: xsum([3, 12]) promise = chord(header=[tasks.add.s(1,2),tasks.mul.s(3,4)],body=tasks.xsum.s())() result = promise.get() return JsonResponse({'function': 'test_primitive', 'result': result})
celery通过flower组件实现管理和监控功能 ,flower组件不仅仅提供监控功能,还提供HTTP API可实现对woker和task的管理
官网:flower · PyPI
文档:Flower - Celery monitoring tool — Flower 1.0.1 documentation
安装flower
pip install flower
启动flower
flower -A celery_study --port=5555
说明:
访问
在浏览器输入:http://127.0.0.1:5555
通过api操作
curl http://127.0.0.1:5555/api/workers
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:mmqy2019@163.com进行举报,并提供相关证据,查实之后,将立刻删除涉嫌侵权内容。
长按识别二维码并关注微信
更方便到期提醒、手机管理