Django使用Celery实现异步发送邮件

前言

在Django使用Celery异步发送邮件的过程中,遇到Celery日志提示任务已接收,但实际上任务并没有执行,解决后特此记录。

使用版本如下:

Django版本:4.1.4

Celery版本:5.2.7

邮箱配置

进入邮箱,找到POP3/SMTP/IMAP项,开启POP3/SMTP服务,添加客户端授权码

-1

Django项目发送邮件

配置邮件服务器

发送邮件时需要配置好SMTP服务器的连接信息。在settings.py中配置邮件服务器信息

  1. # 配置邮件发送
  2. EMAIL_BACKEND = ‘django.core.mail.backends.smtp.EmailBackend’
  3. # 对应邮箱服务器地址
  4. EMAIL_HOST = ‘smtp.163.com’
  5. # 端口
  6. EMAIL_PORT = 25
  7. #发送邮件的邮箱
  8. EMAIL_HOST_USER = ‘admin@163.com’
  9. #在邮箱中设置的客户端授权密码
  10. EMAIL_HOST_PASSWORD = ‘YS22JE123PAZJ2N’
  11. #收件人看到的发件人
  12. EMAIL_FROM = ‘admin<admin@163.com>’

Django发送邮件模块

Django自带了发送邮件的模块django.core.mail,可以方便地使用它来发送电子邮件

send_mail方法描述:

  1. send_mail(subject, message, from_email, recipient_list, html_message=None)
  • subject 主题 邮件标题
  • message 普通邮件正文,普通字符串
  • from_email 发件人
  • recipient_list 收件人列表
  • html_message 多媒体邮件正文,可以是html字符串

基本使用示例:

  1. from django.core.mail import send_mail
  2.  
  3. subject = ‘Subject’ # 主题
  4. message = ‘Message’ # 正文
  5. from_email = ‘noreply@example.com’ # 发件人地址
  6. recipient_list = [‘recipient1@example.com’, ‘recipient2@example.com’] # 收件人地址列表
  7.  
  8. send_mail(subject, message, from_email, recipient_list)

Celery

发送邮箱邮件是耗时操作,所以需要异步发送邮件,使用Celery实现异步任务。

概述

Celery是一个基于python的分布式任务队列,它可以轻松地处理大量的并发任务。Celery支持多种消息传输协议,如AMQP、Redis等,同时也支持多种后端存储系统,如RabbitMQ、Redis等。通过使用Celery,我们可以将一些耗时的任务放到异步的任务队列中,从而提高Web应用的响应速度和性能。

Celery的工作原理非常简单。首先定义一个任务(Task),然后将这个任务加入到任务队列中。Celery Worker会从任务队列中取出任务并执行,完成后将结果返回给调用方。可以根据需要对任务进行优先级排序、设定任务超时时间等。

除了作为任务队列之外,Celery还可以用来实现周期性任务调度,比如定时清理缓存、备份数据库等。Celery提供了丰富的API和插件,可以轻松地完成各种复杂的任务处理需求。

官网:https://docs.celeryq.dev/en/stable/index.html

github:https://github.com/celery/celery

工作模式

默认是进程池方式,进程数以当前机器的CPU核数为参考,每个CPU开四个进程。

指定进程数:

  1. # proj:celery实例对象文件
  2. celery worker A proj concurrency=4

改变进程池方式为协程方式:

  1. # 安装eventlet模块
  2. pip install eventlet
  3.  
  4. # 启用Eventlet,指定协程数目
  5. celery worker A proj concurrency=1000 P eventlet 1000

安装Celery

安装Celery

  1. pip install Celery

Celery的基本使用

创建config.py配置文件

  1. # 设置代理人broker,使用redis的5号库
  2. broker_url = “redis://127.0.0.1/5″
  3. # 设置结果存储,使用redis的6号库
  4. result_backend = “redis://127.0.0.1/6”
  5. # 任务超时限制
  6. task_time_limit = 10 * 60
  7. # 时区
  8. celery_timezone = ‘Asia/Shanghai’

创建Celery实例并加载配置

创建celery_tasks 包,然后创建main.py文件,实现创建Celery实例并加载配置

  1. import os
  2.  
  3. from celery import Celery
  4. from celery_tasks import config
  5.  
  6. # 为celery使用django配置文件进行设置,识别和加载django的配置文件
  7. os.environ.setdefault(‘DJANGO_SETTINGS_MODULE’, ‘项目名.settings’)
  8.  
  9. # 创建celery实例,参数是celery名称,需保证唯一
  10. celery_app = Celery(‘celery_tasks’)
  11.  
  12. # 加载celery配置,设置broker队列
  13. celery_app.config_from_object(config)

定义任务

在包celery_tasks下创建任务包test_task,并在该包下创建tasks.py文件,用于定义任务

  1. from celery_tasks.main import celery_app
  2.  
  3. # bind:保证task对象会作为第一个参数自动传入
  4. # name:异步任务别名
  5. # retry_backoff:异常自动重试的时间间隔 第n次(retry_backoff2^(n-1))s
  6. # max_retries:异常自动重试次数的上限
  7. @celery_app.task(bind=True, name=‘test_task’, retry_backoff=3)
  8. def celerTest(self, number):
  9.      try:
  10.          print(“执行{}号任务”.format(number))
  11.      except Exception as e:
  12.          # 有异常自动重试三次
  13.          raise self.retry(exc=e, max_retries=3)

在celery_tasks.main.py文件种进行任务注册

  1. import os
  2. from celery import Celery
  3.  
  4. # 让celery使用django配置文件,即加载当前工程的配置文件
  5. os.environ.setdefault(“DJANGO_SETTINGS_MODULE”, “demo.settings”)
  6.  
  7. # 创建celery实例,参数是celery名称,需保证唯一
  8. celery_app = Celery(‘demo_celery’)
  9.  
  10. # 加载celery配置,指定配置文件路径,即可设置broker队列
  11. celery_app.config_from_object(‘celery_tasks.config’)
  12.  
  13. # 自动注册celery任务,列表元素是任务包路径
  14. celery_app.autodiscover_tasks([‘celery_tasks.test_task’])

启动Celery服务

  1. # -A 对应的应用程序, 其参数是项目中Celery实例的位置
  2. # worker 要启动的worker
  3. # -l 日志等级,如info等级
  4. celery A celery_tasks.main worker l info
  1. (demo) D:\WorkSpace\Python\demo\demo>celery A celery_tasks.main worker l info
  2.  
  3.      ————– celery@Coding v5.2.7 (dawnchorus)
  4.  ***** —–
  5.  ******* —- Windows1010.0.22000SP0 20230116 23:25:49
  6.  ***  * 
  7.  ** ———- [config]
  8.  ** ———- .> app: demo_celery:0x1fdba863d00
  9.  ** ———- .> transport: redis://127.0.0.1:6379/8
  10.  ** ———- .> results: redis://127.0.0.1/9
  11.  ***  *  .> concurrency: 12 (prefork)
  12.  ******* —- .> task events: OFF (enable E to monitor tasks in this worker)
  13.  ***** —–
  14.      ————– [queues]
  15.                  .> celery exchange=celery(direct) key=celery
  16.  
  17.  
  18. [tasks]
  19.      . test_task
  20.  
  21. [20230116 23:25:49,701: INFO/MainProcess] Connected to redis://127.0.0.1:6379/8
  22. [20230116 23:25:49,703: INFO/MainProcess] mingle: searching for neighbors
  23. [20230116 23:25:50,095: INFO/SpawnPoolWorker1] child process 49036 calling self.run()
  24. [20230116 23:25:50,114: INFO/SpawnPoolWorker2] child process 43196 calling self.run()
  25. [20230116 23:25:50,136: INFO/SpawnPoolWorker3] child process 1284 calling self.run()
  26. [20230116 23:25:50,154: INFO/SpawnPoolWorker4] child process 49708 calling self.run()
  27. [20230116 23:25:50,183: INFO/SpawnPoolWorker5] child process 49704 calling self.run()
  28. [20230116 23:25:50,208: INFO/SpawnPoolWorker6] child process 20884 calling self.run()
  29. [20230116 23:25:50,221: INFO/SpawnPoolWorker7] child process 17840 calling self.run()
  30. [20230116 23:25:50,242: INFO/SpawnPoolWorker8] child process 56040 calling self.run()
  31. [20230116 23:25:50,275: INFO/SpawnPoolWorker9] child process 45968 calling self.run()
  32. [20230116 23:25:50,291: INFO/SpawnPoolWorker10] child process 44888 calling self.run()
  33. [20230116 23:25:50,313: INFO/SpawnPoolWorker11] child process 8848 calling self.run()
  34. [20230116 23:25:50,318: INFO/SpawnPoolWorker12] child process 11020 calling self.run()
  35. [20230116 23:25:50,727: INFO/MainProcess] mingle: all alone
  36. [20230116 23:25:50,740: WARNING/MainProcess] D:\Development\Python\env\demo\lib\sitepackages\celery\fixups\django.py:203: UserWarning: Using settings.DEBUG leads to a memory
  37.              leak, never use this setting in production environments!
  38.      warnings.warn(”’Using settings.DEBUG leads to a memory
  39.  
  40. [2023-01-16 23:25:50,741: INFO/MainProcess] celery@Coding ready.
  41. [2023-01-16 23:25:51,332: INFO/SpawnPoolWorker-13] child process 4580 calling self.run()
  42. [2023-01-16 23:25:51,341: INFO/SpawnPoolWorker-14] child process 44956 calling self.run()
  43. [2023-01-16 23:25:51,453: INFO/SpawnPoolWorker-15] child process 46100 calling self.run()
  44. [2023-01-16 23:25:51,466: INFO/SpawnPoolWorker-16] child process 46872 calling self.run()
  45. [2023-01-16 23:25:52,797: INFO/SpawnPoolWorker-17] child process 2716 calling self.run()
  46. [2023-01-16 23:25:52,800: INFO/SpawnPoolWorker-18] child process 49488 calling self.run()
  47. [2023-01-16 23:25:52,807: INFO/SpawnPoolWorker-19] child process 3912 calling self.run()
  48. [2023-01-16 23:25:53,608: INFO/SpawnPoolWorker-20] child process 40624 calling self.run()

提交任务

  1. from celery_tasks.test_task.tasks import celerTest
  2.  
  3. if __name__ == ‘__main__’:
  4.      for i in range(1,10):
  5.          celerTest.delay(i)

异常

提交任务 Celery控制台日志出现示任务已接收,但并没有执行

INFO/MainProcess] Task send_email[f301b786-af40-4283-a4d6-4a97ae05658f] received
INFO/MainProcess] Task send_email[5997d896-fDB2-4220-92fe-7027291df56d] received

原因:

celery对windows支持不好,需添加组件eventlet 指定协程

解决办法

  1.      pip install eventlet
  1. celery A celery_tasks.main worker l info P eventlet 10

执行1号任务
执行2号任务
执行3号任务
执行4号任务
执行5号任务
执行6号任务
执行7号任务
执行8号任务
执行9号任务

INFO/MainProcess] Task send_email[01457c6c-4571-4b1e-b09c-39df49d50162] received
WARNING/MainProcess] 执行1号任务
INFO/MainProcess] Task send_email[01457c6c-4571-4b1e-b09c-39df49d50162] succeeded in 1.2969999999913853s: None

Celery发送邮件

创建config.py配置文件

  1. # 设置代理人broker,使用redis的5号库
  2. broker_url = “redis://127.0.0.1/5”
  3. # 设置结果存储,使用redis的6号库
  4. result_backend = “redis://127.0.0.1/6”
  5. # 任务超时限制
  6. celery_task_time_limit = 10 * 60
  7. # 时区
  8. celery_timezone = ‘Asia/Shanghai’

创建Celery实例并加载配置

创建定义Celery包:celery_tasks,然后创建main.py文件,实现创建Celery实例并加载配置

  1. import os
  2.  
  3. from celery import Celery
  4. from celery_tasks import config
  5.  
  6. # 为celery使用django配置文件进行设置,识别和加载django的配置文件
  7. os.environ.setdefault(‘DJANGO_SETTINGS_MODULE’, ‘项目名称.settings’)
  8.  
  9. # 创建celery实例,参数是celery名称,需保证唯一
  10. celery_app = Celery(‘celery_tasks’)
  11.  
  12. # 加载celery配置,设置broker队列
  13. celery_app.config_from_object(config, namespace=‘CELERY’)
  14. # celery_app.config_from_object(‘celery_tasks.config’)
  15.  
  16. # 自动注册celery任务,列表元素是任务包路径
  17. celery_app.autodiscover_tasks([‘celery_tasks.email])

定义发送邮件任务

在包celery_tasks下创建任务包email_task,并在该包下创建tasks.py文件,用于定义任务

  1. from django.conf import settings
  2. from django.core.mail import send_mail
  3.  
  4. from celery_tasks.main import celery_app
  5.  
  6.  
  7. # bind:保证task对象会作为第一个参数自动传入
  8. # name:异步任务别名
  9. # retry_backoff:异常自动重试的时间间隔 第n次(retry_backoff2^(n-1))s
  10. # max_retries:异常自动重试次数的上限
  11. @celery_app.task(bind=True, name=‘send_email’, retry_backoff=3)
  12. def sendEmail(self, to_email, verify_url):
  13.      subject = “邮箱验证”
  14.      html_message = ‘<p>尊敬的用户您好!</p>’ \
  15.                      ‘<p>您的邮箱为:%s 。请点击此链接激活您的邮箱:</p>’ \
  16.                      ‘<p><a href=”%s” rel=”external nofollow” >%s<a></p>’ % (to_email, verify_url, verify_url)
  17.      try:
  18.          send_mail(subject, “”, settings.EMAIL_FROM, [to_email], html_message=html_message)
  19.      except Exception as e:
  20.          # 有异常自动重试三次
  21.          raise self.retry(exc=e, max_retries=3)

启动Celery

在使用Celery时,需要启动worker进程来处理异步任务。可以使用以下命令来启动worker进程:

  1. celery A celery_tasks.main worker l info P eventlet 10

调用发送邮件异步任务

定义请求地址

  1. from django.urls import re_path
  2.  
  3. from . import views
  4.  
  5. urlpatterns = [
  6.      re_path(r‘^send/$’, views.SendView.as_view(), name=‘send’),
  7. ]

定义视图,并发送邮件

  1. class SendView(View):
  2.      def get(self, request):
  3.          for i in range(1, 2):
  4.              # 异步发送验证邮件
  5.              verify_url = ‘https://www.admin.com’
  6.              email = ‘admin@qq.com’
  7.              res = sendEmail.delay(email, verify_url)
  8.              print(res)
  9.          return http.jsonResponse({“msg”: “OK”})

以上就是Django使用Celery实现异步发送邮件的详细内容,更多关于Django Celery异步发送邮件的资料请关注我们其它相关文章!

标签

发表评论