Skip to main content

Pywhio

官方 @ PywhiO. Python大数据, AI技术, 微服务, DRF+REACT前后端分离实践.

3 min read · December 22nd 2019 (originally published at pywhio)

如何利用celery处理Email

阅读:-


​ 如何利用celery的chain原语实现海量邮件的处理

​ celery是一种非常博大精深的分布方式计算框架,他可以非常方便的实现系统级的任务编排,我们今天就来看看其中的chain原语是如何实现这个功能的。

1575563374185

1 如何利用celery实现海量邮件的处理

1.1 基于celery的ETL方案

1.1.1 用例故事

clip_image001

​ 在大数据处理的环节中,我们需要有能力同时处理结构化数据和非结构化数据。 很多时候需要把海量数据同时传输到关系型数据库和非关系型数据库中,在这种情况下,往往需要有针对任务编排的能力。此时如何合理的进行任务编排,在开否的时候就显得至关重要,在本案例中,我们需要将52万封邮件,同时装载到数据库和ES中,下面看看在现实环境中是如何实现的。

1.1.2

1、 用户将待测试的邮件拷贝到文件夹下。

2、用户启动ETL数据处理过程。

3、timer 首先发送一个email解析的任务,其中携带了email文件的路径,传送到相应的queue中。

4、解析器接从queue中提取这个task, 获得一封email,解析邮件内容,提取其中的信息结构,形成一个数据集。并返回一个字典对象。

5、timer 接收到邮件解析完毕以后的字典对象以后, 同时发出两个task,一个task是将数据字典拷贝到关系型数据库中,另外一个task是将数据字典拷贝到搜索服务器ES中去。

6、关系型数据库装载器从queue中接收到这个任务,将数据拷贝到关系型数据库中。

7、ES装载器从queue中接收到对应的任务,将数据拷贝到ElasticSearch服务器中。

8、三个任务均执行完毕以后,一封邮件处理完毕。

设计约束

在本方案中,我们将使用celery作为分布方式队列的调度方案。

1.2 组件设计

1.2.1 解题思路

分析用例事件模型,我们可知邮件的处理要经过三个阶段,分别是邮件的解析、解析结果导入关系型数据库、解析结果导入elasticsearch服务器,运用celery分布式任务队列来完成这一过程。我们可以创建三个worker来分别处理这一过程中的三个阶段,handle负责邮件的解析,load_es负责导入结果到elasticsearch服务器,load_pg负责导入解析结果到postgresql

1.2.2 主要组件介绍

project目录结构图:

clip_image002

主目录(parse_email)包含四个子文件:

celeryconfig:celery的配置信息

invoke_task:生产者,调用celeryAPI

config:连接postgresql、elasticsearch的配置信息

tasks:执行任务的消费者

1.2.2.1 tasks.py

封装了三个原子函数handle(对邮件进行解析)、load_pg(导入解析结果到pg)、load_es(导入解析结果到es服务器)以及load_pg函数连接数据库必需要的connect函数,主要对象结构图如下:

clip_image003

1.2.2.2 invoke_task.py

封装了publish函数,根据os.waik()函数,遍历目标目录与子目录下的所有文件,导入celery模块的chain()与group()方法,chain()方法可以将任务链接在一起,将上一个函数执行完毕的返回值当做参数传递给下一个函数,group()方法可以同时调用两个函数,chain首先调用handle()函数,然后把handle函数返回值传给group同时调用load_pg和load_es:

clip_image004

1.2.2.3 celeryconfig.py

celery配置信息

broker:消息代理, 队列本身. 也称为消息中间件. 接受任务生产者发送过来的任务消息, 存进队列再按序分发给任务消费方(通常是消息队列或者数据库,官方推荐RabbitMQ)

BROKER_URL = ‘pyamqp://python:841574470@192.168.223.132:5672//’

backend:任务处理完成之后保存状态信息和结果(通常储存在数据库)

result_backend = ‘cache+memcached://127.0.0.1:11211/’

queue:指定broker在RabbitMQ消息传递的队列

queue = (

​ Queue(‘handle’, exchange=Exchange(‘handle’, type=‘direct’), routing_key=‘handle’),

​ Queue(‘load_es’, exchange=Exchange(‘load_es’, type=‘direct’), routing_key=‘load_es’),

​ Queue(‘load_pg’, exchange=Exchange(‘load_pg’, type=‘direct’), routing_key=‘load_pg’),

)

route:指定worker接收消息的路由

route = {

​ ‘handle’: {‘queue’: ‘handle’, ‘routing_key’: ‘handle’},

​ ‘load_es’: {‘queue’: ‘load_es’, ‘routing_key’: ‘load_es’},

​ ‘load_pg’: {‘queue’: ‘load_pg’, ‘routing_key’: ‘load_pg’},

}

1.2.2.4 config.py

连接postgresql、elasticsearch的配置信息

pg_conn_param = {

​ “host”: “192.168.223.132”,

​ “database”: “bank”,

​ “user”: “username”,

​ “password”: “841574470”,

​ “port”: “5432”

​ }

Elasticsearch_param = [{‘host’: ‘localhost’, ‘port’: 9200}]

1.3 组件间交互关系

1.3.1 中间件准备

1.3.1.1 数据库表的创建

  1. 由于需要将邮件解析后的结果导入到pg,所以我们要创建一个表email来存储邮件的解析结果,表的相关字段信息结构如下:

    clip_image005

  1. email表以邮件的message-id为主键,所有字段的类型为text

1.3.1.2 memcached的安装

  1. 本次案例中celery的异步结果储存在mc中,所以同样的在docker中安装memcached,首先拉取镜像:

docker pull Memcached

  1. 启动容器运行memcached,将11211端口映射在本机上:

docker run -p 11211:11211 —name memcache memcached

  1. 测试memcached是否安装成功,通过Telnet方式连接memcached
telnet 127.0.0.1 11211

clip_image006

设置值,age是key,0是标志位,900是生命周期,8代表所占字节数,回车之后的22是value

1.3.2 解题方案

  1. 根据需求,我们在tasks.py中导入celery,创建celery的实例对象app,导入celery的配置信息(broker和backend),给handle、load_es、load_og三个函数添加装饰器,实现异步调度的功能,作为celery的worker

  2. 在invoke_task.py中,创建publis函数作为producer,利用os.walk()函数获取目标目录以及子目录所有文件,然后对文件进行遍历,调用chain方法发起异步执行函数broker,chain首先调用handle()函数,然后把handle函数返回值传给group同时调用load_pg和load_es执行,worker接收到消息开始执行任务,并将执行结果储存在memcached,流程结束。

    clip_image007

​ 图1: chain()与group()调用的处理过程。

  1. 在终端中输入命令启动三个worker分别执行handle、load_pg、load_es三个任务

celery -A tasks worker -l info -n handle.%h -Q handle

celery -A tasks worker -l info -n load_es.%h -Q load_es

celery -A tasks worker -l info -n load_pg.%h -Q load_pg

tasks代表worker所在的文件名, -l info代表以普通模式启动,—concurrency代表开启两个进程来执行这个任务,-n指定worker名,-Q指定队列名

handle启动成功终端截图

clip_image008

load_pg启动成功截图:

clip_image009

load_es启动成功截图:

clip_image010

终端输出celery的配置信息、通信的queues、执行任务的worker、同步的节点等信息

  1. 运行publish函数,chain和group发送任务到broker,worker接收broker的消息开始执行任务,开始执行任务终端输出信息为:

handle输出信息:

clip_image011

load_pg输出信息:

clip_image012

load_es输出信息:

clip_image013

输出信息会记录每个worker执行任务的结果、消耗的时间、任务的返回值等,其中load_es在执行中还发起了post请求,记录了请求的地址响应状态码,请求时间。所以我们可以得出结论:导入数据到es服务器这一过程就是再向es服务器发起post请求,尝试访问一下post请求链接:

clip_image014

请求页面为json字符串

  1. 1.3GB邮件全部解析完毕后,数据库查询解析总量:

    clip_image015

elasticsearch查询解析总量:

clip_image016

对比解析结果总数一致,故程序顺利执行无异常。

1.3.3 组件调用流程

​ 图2: 组件调用流程图

chain()方法顺序调度handle和group函数,group()函数同时调度load_es和load_pg

  1. chain()向broker发布handle任务

  2. handle接收到消息开始执行务

  3. handle将函数执行完毕的返回值返回给broker(邮件解析完的字典类型数据)

  4. chain()得到dict

  5. 通过group()函数,chain()向broker同时发布load_es和load_pg任务

  6. load_es和load_pg接收到消息开始执行任务,流程完毕

备注:此处chain()与group()函数负责整合三个原子函数,实现服务之间的整合。

chain(handle.s(path), group(load_es.s(index, doc_type), load_pg.s()))()

1.3.4 组件之间调用关系

clip_image018

图3: 组件调用关系图

publish函数遍历filelist,对每个文件的绝对路径执行chain函数。chain首先调用apply_async()函数异步执行handle,handle执行完毕得到返回值dict(邮件解析后的字典),chain得到返回值通过group函数同时调用apply_async()函数异步执行load_es和load_pg,流程结束

1.3.5 原子函数的构建

1.3.5.1 handle()

clip_image019

​ 图 4: handle解析流程图

  1. handle函数负责解析邮件返回一个解析结果的dict

  2. 首先根据传入文件的绝对路径,创建读文件的句柄,导入email模块,调用email.message_from_file()函数对邮件初步解码,得到初步解码对象msg,输出msg对象:

    clip_image020

  3. 对msg调用items()方法获取所有消息的字段和值,以列表嵌套元祖的形式顺序展现出来,得到message_data,输出message_data:

    clip_image021

  4. 对msg调用get_payload()方法获取邮件的正文,得到message_payload

  5. 遍历列表message_data,将列表中的每个元祖通过索引取值转换为键值对形式的字段对象message_data_dict,然后添加payload键值对

  6. 由于pg数据表部分字段不能大写,所以这里再将message_data_dict的所有键调用lower()函数全部转换为小写,最后返回新的字典对象new_dict

1.3.5.2 load_pg()

  1. load_pg 负责将邮件的解析数据导入postgresql

  2. 导入pandas模块,调用dataframe()方法将dict转换为dataframe对象

  3. 调用to_sql方法将df对象导入pg数据库

1.3.5.3 load_es()

  1. load_es 负责将邮件的解析数据导入elasticsearch服务器

  2. 导入elasticsearch下的Elasticsearch类,根据连接elasticsearch服务器的参数Elasticsearch_param,创建Elasticsearch类的实例对象es

  3. 根据实例对象es调用index方法向elasticsearch服务器导入数据,需要传入index, doc_type, body,id四个参数,其中body为需要上传的数据,id在本次案例中,由于邮件解析的message-id具有唯一性,所以将id的值设置为message-id

1.4 设计关键点考虑

1.4.1 backend的选择

在本次案例中,由于需要解析的数据量巨大,我们需要对数据进行高频率的解析与读取,所以对本次案例中使用的各种中间件的性能要求非常高,例如作为celery分布式队列的broker,本次案例中我们选择的是官方首选推荐的RabbitMQ,毫无疑问,它的表现是非常出色的。

clip_image022

图5: handle在delay了35W个任务,仍然以接近平均速率解析

但是作为任务执行的状态结果,官方推荐了许多种后端SQLAlchemy / Django ORM, Memcached,Redis等等,我们不妨逐一来尝试一下

1.4.1.1 Redis结果后端

  1. redis作为一款开源的数据结构服务器,它具有优异的性能,所以RabbitMQ + Redis的组合使用最为流行

result_backend = ‘redis://localhost/0’

任务执行成功Redis储存结果:

clip_image023

  1. 但是,在本次案例中高频率写入结果到Redis,进行测试当任务进行到两万个左右就会抛一个出redis写入socket失败的错误:

redis.exceptions.ConnectionError: Error 104 while writing to socket.

  1. 所以redis比较适合低频率执行任务的项目,类似本次案例这种高频的就不合适了

1.4.1.2 sqlalchemy关系型数据库结果后端

  1. 使用SQLAlchemy也是一种作为后端的方式,result_backend必须使用连接URL和db+ 前缀配置设置:

result_backend = ‘db+postgresql://username:password@localhost/mydatabase’

  1. 将SQLAlchemy配置为结果后端时,Celery自动创建两个表来存储任务的结果元数据。此设置可以自定义表名称:

database_table_names = {

​ ‘task’: ‘myapp_taskmeta’,

​ ‘group’: ‘myapp_groupmeta’,

}

  1. 进行测试,程序能正常运行无任何报错,查看任务执行结果:

    clip_image024

  2. 但是查看RabbitMQ管理页面,发现load_pg任务 delay最严重,且效率低:

    clip_image025

这是由于本次案例邮件解析结果和任务执行结果都往pg里面存,这就造成了同一时刻连接数据库读写的阻塞,所以pg后端在本次案例中是可行的,但会影响效率

1.4.1.3 memcached结果后端

  1. Memcached是一套开源、分布式的高速缓存系统,通过缓存数据库查询结果提高效率,还能进行分布式存储,将资料分散在不同机器上。

  2. 使用单个memcached服务器:

result_backend = ‘cache+memcached://127.0.0.1:11211/’

  1. 进行测试程序正常运行且无报错信息,查看执行结果

    clip_image026

  2. RabbitMQ传递消息速率相比pg较快:

    clip_image027

1.4.2 异步调用方案

  1. 关于group

在本次案例中,我们调用handle对邮件进行解析,然后将解析的结果dict作为返回值传给load_es和load_pg,很明显,这是一个链式调用,我们可以通过chain()方法首先调用handle函数,再将handle函数的返回值传给下一个,但是用例中要求同时调用load_es和load_pg,这就要用到group()方法了

  1. 在celery文档有关chain的介绍之后,还有一个group函数,group函数作为一个组,采用签名列表,一个组可用于并行执行多个任务。如果您呼叫该小组,则这些任务将在当前流程中一个接一个地应用,并GroupResult 返回一个实例,该实例可用于跟踪结果或告知已准备好多少个任务。组是签名对象,因此可以与其他签名结合使用。所以我们可以将chain和group结合起来使用,用chain链式调用handle和group,用group同时调用load_es和load_pg
  1. 组任务也返回一个特殊的结果,该结果与普通任务结果一样工作,只是它在整个组中都起作用,对它们进行操作,就好像它是一个单一的任务。它支持一下操作:

· successful()

返回True如果全部顺利完成子任务(例如,没有提出一个例外)。

· failed()

True如果任何子任务失败,则返回。

· waiting()

True如果任何子任务尚未准备就绪,则返回。

· ready()

True如果所有子任务都准备就绪,则返回。

· completed_count()

返回完成的子任务数。

· revoke()

撤消所有子任务。

· join()

收集所有子任务的结果,并以与调用它们相同的顺序(作为列表)返回它们。

clip_image028

​ 图6:group类图

4.对chain进行抓包(handle未存在?)

在终端不运行celeryworker的情况下,调用chain函数向RabbitMQ发送请求,我们在management页面对请求进行抓包结果如下:

clip_image029

  1. chain抓包结果分析

抓包页面中Properties选项显示了本次抓包的各项属性,argsrepr显示本次传递给getter函数的两个参数,task为执行本次任务worker的名称,等等。

在Payload选项中存在一个大列表,对大列表进行整理结果如下:

clip_image030

载荷储存了函数执行的参数,链式调用所要执行的函数及其参数,task_id等参数

  1. 对group进行抓包(load_pg,load_es)

运行handle函数的worker,分别在load_es和load_pg抓取到了数据包,类型结构基本与chain抓取到的一样

clip_image031

clip_image032

clip_image033

  1. load_es、load_pg抓包结果整理

    clip_image034

    clip_image035

1.4.3 backend异步任务直接结果分析

1.4.3.1 flower插件查看结果

Celery提供了一个工具flower,将各个任务的执行情况、各个worker的健康状态进行监控并以可视化的方式展现。

  1. 首先终端中下载flower插件pip install flower

  2. 指定broker并启动

celery flower —broker=amqp://guest:guest@localhost:5672//

  1. 开启worker后访问 http://localhost:5555/,首页展现本次开启的的三个worker

    clip_image036

  2. 发布任务后点击task选项即可获取表格形式的任务名称、UUID、状态、参数、返回值、起始时间、运行时间等参数:

    clip_image037

  3. 根据右上角的search搜索框,输入值进行搜索可以查询结果匹配的任务,我们可以根据任务的hash值来查询任务:

1.4.4 memcached介绍

1.4.4.1 memcached内存原理(提炼)

1.memcached的内存分配默认是采用了Slab Allocator的机制,将所有的输出分散存储在一个个slab中

  1. memcached Slab Allocator内存分配机制预先将内存分割成尺寸相同(默认是1MB)的Slab class1、Slab class2、… …、Slab classN,再将Slab class分成一个个尺寸相同的Chunk,Chunk是用于存放缓存记录的最小单位

    clip_image038

  2. 假如一个数据大小为100b, memcached会选择将它储存在能容下100B的最小chunk来存储它,这样空间的利用达到了最大化,保证chunk size>= N

    clip_image039

1.4.4.2 memcached查询命令介绍

  1. stats items: 显示各个 slab 中 item 的数目和存储时长(最后一次访问距离现在的秒数),8为slab编号,number为数量48个

clip_image040

  1. stats cachedump slabs_id limit_num 查询slab的所有项,num代表返回的数量,0则返回所有数据

    clip_image041

  2. 通过查询的项的id查询值

    clip_image042

1.4.5 memcached持久化处理

1.4.5.1 python Memcache包的介绍

memcache封装了python连接memcached客户端的接口,利用python代码来操作memcached,memcache封装了client类,client类封装了一系列的操作memcache的指令方法

set 设置一个值,若存在则覆盖

get 查询键对应值的内容

add 设置一个值,若存在则报错

replace 修改值,若不存在则返回false

delete 删除一个值

get_slab 查询所有slab

get_stats 查询slab的项

1.4.5.2 memcached持久化(流穿越、set)

  1. Memcached持久化主要分为两个阶段,第一查询mc服务器的所有键,以列表形式返回所有键;第二根据所有键的列表,遍历列表,利用键来查询值,然后将值插入数据库

  2. 创建获取所有键的函数get_mc_keys,传入连接memcached参数创建实例对象mc,调用get_slab方法得到所有slab

    clip_image043

  3. 通过字典取值获取所有键等方式获取所有的slab编号,遍历slab编号,调用get_stats方法查询对应编号下的所有key,将key添加到keylist

    clip_image044

​ 图7: 函数get_mc_keys流穿越过程

  1. 创建导入结果的result_to_pg函数,遍历key_list,先查询key是否存在task_result表中,如果存在就不导入结果

  2. 调用get方法查询key 的值,由于查询到的值为json字符串,调用json.load方法将json字符串转化为dict

6.由于mc转化为pg持久化mc中有部分字段不需要,所以遍历字典,删除掉我们不需要的字段

7.由于我们的最终目的是将result持久化,即通过每个文件独有的message-id来获取文件的各个任务执行情况,所有我们在load_es和load_pg返回了message-id,这样在持久化之后,与之对应的数据表中result字段储存了该任务的message-id,但是handle返回值是一个大字典,所以我们对返回值进行判断,如果为dict类型,就将其赋值为message-id

  1. 导入pandas模块,调用dataframe方法将dict转化为dataframe对象,调用to_sql方法将dataframe导入数据库

    clip_image045

​ 图8: 函数result_to_pg流穿越过程

分享到微博

Previous

如何基于celery的chain原语实现任务编排
Starter
MicroServ
Tutorials
Blog