Skip to main content

Pywhio

官方 @ PywhiO. Python大数据, AI技术, 微服务, DRF+REACT前后端分离实践.

3 min read · December 6th 2019 (originally published at pywhio)

如何基于celery的chain原语实现任务编排

阅读:-


​ 如何利用celery的chain原语实现任务编排

​ celery是一种非常博大精深的分布方式计算框架,他可以非常方便的实现系统级的任务编排,我们今天就来看看其中的chain原语是如何实现这个功能的。

1575563374185

1 基于celery的ETL方案

1.1 基于Queue中间件方案回顾与讨论

blog3-1- third-party-queu

​ 图 1-1 基于独立Queue方案的ETL方案

在这个方案中,引入了独立的Queue中间件,rabbitmq,获得了一种非常强大的Queue的解决方案。

然而,为了使用复杂的rabbitmq方案,需要调用借助相关的客户端pika, kombu来实现相关的功能。而调用这些客户端,则需要使用一种叫做callback(回调)的编程机制,回调本质上是一种进程间的通信机制,但是实现起来难度比较高。 其中回调的原理,简单说明如下:

blog3-2-callback

​ 图2-2 回调的实现机制

​ 从上面的callback()机制,我们可以看到,相关的机制实现的难度远远比单机的多进程(多线程的通信机制)要困难。如果在大数据的处理情景下,需要实现分布式的进程通信,这就需频繁的使用回调编程,这也是不小的设计难度。

那么我们有没有一种实现方案,既可以实现分布式的多机协作,又可以让应用程序Application Program 的编程变得非常的容易?非常幸运,在python的微服务的设计方案中,有一个很强大的中间件叫做celery。 而celery中的编程,核心是一种面向task的编程模式。

1.1.1 Celery中间件

blog3-3-celery-basic

​ 图 1-3 celery 的编程模型

从上图可见,相关的celery的编程模型中,划分为三个部分,分别是:

1.Producer(celery client): 负责生产相关的task,并发送到一个broker中。

2.broker: 作为一个task的路由器,将task分发到不同的接收者。这里broker实现对rabbitmq的封装,屏蔽了rabbitmq复杂的调用细节。

3.Consumer(celery worker):接收到task以后,负责根据task的信息结构,执行相关的功能。

以上是celery的编程模型,在这种模型下,可以非常方便的实现一种分布式的编程方案。

blog3-4-celery -intro

1.1.2 Celery的task模型

blog3-5-celery -task-model

Celery 最核心的一个概念是task对象模型。

The basic building block in celery is a task。

To declare a task all you need to do is wrap a function with celery’s task decorator。 Now you are ready to run this function locally or remotely and add it to workflows that link together Other similarly wrapped functions。

在芹菜中,最基础的组件是task。

So here by calling delay on the create user task we are instructing celery to run this task remotely 。This means that celery will send a message to in exchange with instructions on which tasks to run and any args and kwargs pass to the function。

As we saw earlier this message will also contain a routing key which routes the message to the appropriate queue based on the exchanges type and its bindings. A celery worker watching a queue will then retrieve the message、 deserialize it、 run the function and store the result in a storage layer such as Redis 。

以下,是celery中缺省的queue模型。

blog3-6-celery -default-queue

1.1.3 Celery 与kombu

Celery 与kombu是两个关系密切的组件,celery通过kombu来实现对rabbitmq的调用。

Kombu is a messaging library for Python.

The aim of Kombu is to make messaging in Python as easy as possible by providing an idiomatic high-level interface for the AMQ protocol, and also provide proven and tested solutions to common messaging problems.

AMQP is the Advanced Message Queuing Protocol, an open standard protocol for message orientation, queuing, routing, reliability and security, for which the RabbitMQ messaging server is the most popular implementation.

kombu的特点是支持多种的符合APMQ协议的消息队列系统。不仅支持原生的AMQP消息队列如RabbitMQ,还支持虚拟的消息队列如redis、mongodb、beantalk、couchdb、in-memory等。

kombu,接口更为简单,而且kombu它支持重新连接策略,连接池,故障转移策略等,且与celery的结合更为紧密。

所以,我们在本章中,将理解,更加简单的celery实现方案。

1.2 基于celery的ETL方案

1.2.1 用例事件模型

下面,将使用celery方案的用例模型做一个简单的阐述

  1. getter扫描临时文件夹(Temp file directory),发现新到达的网点交易文件,将新到达的网点交易文件拷贝到处理文件夹中(handle file directory)。

    2.getter将新到达的网点交易文件事件信息 写入到一个待处理队列hqueue中。

    3.handler 负责监控hqueue中的新文件事件到达的情况,如果有新文件事件到达,则提取相关的文件的URL信息(文件的绝对路径)。

    4.handler 对网点交易文件进行处理,生成结果文件,将结果文件拷贝到装载文件目录下(load file directory)下。

    5.handler将新生成的网点交易结果文件写入到一个待装载队列lqueue中。

  2. loader负责监控hqueue中的新文件事件到达的情况,如果有新文件事件到达,则提取相关的文件的URL信息(文件的绝对路径),处理新到达的结果文件,将结果文件拷贝到数据库中。

  3. 用例执行完毕。

1.2.2 设计约束

在本方案中,我们将使用celery作为分布方式队列的调度方案。

1.3 组件设计

1.3.1 解题思路

存在三个进程getter(采集文件),handler(处理文件),loader(装载文件),我们需要改用celery来调用函数执行任务。可以将处理任务的三个组件分别当做celery的三个worker,我们只需要定义producer来发布任务调度worker来执行任务即可

1.3.2 主要组件介绍

project目录结构图:

blog3-7-package-strcut

主目录(celery_server_split)包含四个子文件:

celeryconfig:celery的配置信息

invoke_task:生产者,调用celeryAPI

pg_conn_cfg:连接postgresql的配置信息

tasks:执行任务的消费者

1.3.2.1 task.py

封装了三个原子函数getter(对文件进行采集)、handler(对文件进行处理)、loader(将文件导入数据库)以及loader函数连接数据库必需要的connect函数,主要对象结构图如下:

blog3-8-task-object

1.3.2.2 invoke_task.py

封装了run函数,根据列表生成式,遍历目标目录下的所有文件,导入celery模块的chain方法,该方法可以将任务链接在一起,将上一个函数执行完毕的返回值当做参数传递给下一个函数:

blog3-9-celery-chain

1.3.2.3 celeryconfig.py

celery配置信息

broker:消息代理, 队列本身. 也称为消息中间件. 接受任务生产者发送过来的任务消息, 存进队列再按序分发给任务消费方(通常是消息队列或者数据库,官方推荐RabbitMQ)

BROKER_URL = 'pyamqp://python:841574470@192.168.223.132:5672//'  

backend:任务处理完成之后保存状态信息和结果(通常储存在数据库)

CELERY_RESULT_BACKEND = 'redis://192.168.223.132/0'  

queue:指定broker在RabbitMQ消息传递的队列

queue = (*Queue*('celery', exchange=Exchange('celery', type='direct'), routing_key='celery'))

route:指定worker接收消息的路由

route = {'*': {'queue': 'celery', 'routing_key': 'celery'}}

1.3.2.4 pg_conn_cfg.py

连接postgresql的配置信息

pg_conn_dict = {  

​                "host": "192.168.223.132",  

​                "database": "bank",  

​                "user": "username",  

​                "password": "841574470",  

​                "port": "5432"  
}

1.4 组件间交互关系

1.4.1 解题方案

blog3-10-solution-view

  1. 根据需求,我们在tasks.py中导入celery,创建celery的实例对象app,导入celery的配置信息(broker和backend),给getter、handler、loader三个函数添加装饰器,实现异步调度的功能,作为celery的worker

  2. 在invoke_task.py中,创建run函数作为producer,利用列表推导式获取目标路径下所有文件列表,然后对文件列表进行遍历,调用chain方法发起异步执行三个函数getter()、handler()、laoder()的请求到broker,worker接收到消息开始执行任务,并将执行结果储存在Redis,流程结束。

blog3-11-chain-struct

​ 图1: chain()调用的处理过程。

  1. 在终端中输入命令启动worker执行任务
    celery -A tasks worker -l info

tasks代表worker所在的文件名, -l info代表以普通模式启动,启动成功终端截图

blog3-12-celery-start

终端输出celery的配置信息、通信的queues、执行任务的worker等信息

  1. 运行run函数,producer发送任务到broker,worker接收broker的消息开始执行任务,开始执行任务终端输出信息为:

输出信息会记录每个worker执行任务的结果、消耗的时间、任务的返回值

1.4.2 组件调用流程

blog3-13-callstack-procedure

chain()方法将三个原子函数链接在一起顺序进行调度

  1. chain()向broker发布getter任务

  2. getter接收到消息开始执行务

  3. getter将函数执行完毕的返回值返回给broker(待处理文件的绝对路径)

  4. chain()得到待处理文件的绝对路径

  5. chain()向broker发布handler任务

  6. handler接收到消息开始执行任务

  7. handler将函数执行完毕的返回值返回给broker(待装载文件的绝对路径)

  8. chain()得到待装载文件的绝对路径

  9. chain()向broker发布loader任务

  10. loader接收到消息开始执行任务,流程完毕

备注:此处chain()函数负责整合三个原子函数,实现服务之间的整合。

chain(getter.s(file, handle_directory), handler.s(load_directory, field), loader.s(table)

1.4.3 组件之间调用关系

blog3-14-compoent-interworking

run函数遍历filelist,执行chain函数。chain首先调用apply_async()函数异步执行getter,getter执行完毕得到返回值handle_file_path(待处理文件的绝对路径),chain得到返回值调用apply_async()函数异步执行handler,handler执行完毕得到返回值load_file_path(待装载文件的绝对路径),chain得到返回值调用apply_async()函数异步执行loader,流程结束

1.5 设计关键点考虑

1.5.1 原子函数返回值

由于getter、handler原子函数都具有返回值,那么在使用delay()异步调用函数的时候,如何获取到原函数的返回值呢?我们先尝试输出三个原子函数异步调用的返回值:

blog3-15-return-value-func

任务执行成功Redis储存结果:

blog3-16-result-in-redis

比对结果得知异步调用函数的返回值就是该任务执行成功储存在backend的执行结果,查阅celery文档与执行结果相关的result类,有以下方法和属性:

  1. result.get()等待异步任务执行完毕,返回原函数的返回值。对三个异步调用函数的返回值result调用get()方法:

blog3-17-get-function

输出结果为:

1).getter函数返回值(待处理文件绝对路径),

2).handler函数返回值(待装载文件绝对路径),

3). loder 函数成功载入数据到数据库返回一个‘success’字符串,所以输出success

  1. result.ready()获取异步函数执行结果(已执行返回True,未执行或正在执行返回False),调用get()方法等待函数执行完毕,在get方法调用前后分都调用ready()方法获取函数执行结果:

blog3-18-ready-function

  1. result.status获取异步函数的执行状态,分别具有五种状态:PENDING(待处理)、STARTED(已开始)、RETRY(重试)、FAILURE(失败)、SUCCESS(成功),在get()函数调用前后获取异步函数status属性:

blog3-19-status-function

1.5.1.1 对象result提供的方法

关于result的相关操作方法归纳如下:

blog3-20-result-object

1.5.2 异步调用方案讨论

1.5.2.1 delay().get()方案的不足

blog3-21-blosck-sync-call

由于get存在阻塞,在run函数for循环中顺序执行异步调用函数,然后get获取返回值,然后下一次循环任务才会进行。如果某一个任务执行失败,没有取到结果,那么函数就一直在get这个函数的执行结果,从而影响后面函数的执行,进而影响for循环中后面文件的采集处理,这样的方案显然是不合理的,所以每一个原子函数应该只负责自己下一步函数的异步调用。

1.5.3 异步调用非阻塞方案

1.5.3.1 侵入式调用异步函数

blog3-22-async-inject-call

阻塞存在的原因是因为要获取上一个异步函数执行的返回值,如果我们在原子函数执行完成之后发起函数的异步调用,这样就不存在阻塞了,但是入侵了原子函数的代码,所以这个方案也是不可行的,因为会导致后续的代码维护比较困难。

1.5.3.2 非侵入式调用异步函数

​ 如果想解决这个问题,我们就考虑设计一种非侵入方式的异步调用,其中包含了需要借助celery提供的chain()原语。

1.5.4 chain方案

  1. 关于chain

查阅celery文档,在Designingwork-flows章节,提供了一个类chain(),chain继承自它的私有类_chain(),_chain封装了一个apply_async方法发起执行异步函数的请求。chain会执行第一个任务,将其返回值传递给链中的下一个任务,依次类推,这样就算后续程序执行失败也不会影响循环中下一个文件的采集处理。它还设置parent属性,可以沿着链条向上移动以获得中间结果,打印parent属性:

print(result.parent.get())

print(result.parent.parent.get())

blog3-23-result-parent

2.对chain进行抓包

在终端不运行celeryworker的情况下,调用chain函数向RabbitMQ发送请求,我们在management页面对请求进行抓包结果如下:

blog3-24-payload-rabbitmq

抓包结果分析

抓包页面中Properties选项显示了本次抓包的各项属性,argsrepr显示本次传递给getter函数的两个参数,task为执行本次任务worker的名称,等等。

在Payload选项中存在一个大列表,对大列表进行整理结果如下:

blog3-25-dict-message

  1. Payload载荷分析

函数执行debug如下所示:

blog3-26-payload-args

函数debug运行获取的变量信息与payload的参数相对比:

列表中第一项是传递给getter函数的两个参数,然后就是chain的这个键值对中,value存放了chain后面调用的两个函数各种属性,args为函数所需参数,task_id与函数执行完成后redis的id相比对结果相同,task为worker的名称。

1.5.5 backend异步任务直接结果分析

1.5.5.1 Redis中查看结果

异步任务执行成功结果储存在Redis中,在Redis中输入命令‘keys*’查看所有键:

blog3-27-redis-key

与worker中的task_id相比对:

blog3-28-task-id

比对结果相同,再通过redis的键取对应值:

blog3-29-redis-key-value

值为本次task执行的执行结果、任务状态、结束时间等参数

1.5.5.2 flower插件查看结果

Celery提供了一个工具flower,将各个任务的执行情况、各个worker的健康状态进行监控并以可视化的方式展现。

  1. 首先终端中下载flower插件pip install flower

  2. 指定broker并启动

celery flower --broker=amqp://guest:guest@localhost:5672//
  1. 访问 http://localhost:5555/ 即可,点击task选项即可获取表格形式的任务名称、UUID、状态、参数、返回值、起始时间、运行时间等参数:

blog3-30-celery-flower

1.6 总结

现在我们演示了,如何通过celery的微服务方案。

基于celery的微服务,有以下的优点:

1575562122343

1.无需理解复杂的rabbitmq方案,直接配置broker就可以。

2.通过装饰器的方法,在不修改原子函数的情况下,实现了对方法异步调度的能力。

3.关于celery的管理,采用集中配置的方案,易于理解。

虽然,限于篇幅的关系,没有深入的讲究celery的特性,相关的内容在后续的case中会进一步介绍。

分享到微博

Previous

如何基于分段接收实现socket协议远程文件传输
Starter
MicroServ
Tutorials
Blog