Skip to main content

基于celery的ETL方案

9 基于celery的ETL方案

9.1 基于Queue中间件方案回顾与讨论

9 1 third party queue Based ETL solution

​ 图 9-1 基于独立Queue方案的ETL方案

在这个方案中,引入了独立的Queue中间件,rabbitmq,获得了一种非常强大的Queue的解决方案。

然而,为了使用复杂的rabbitmq方案,需要调用借助相关的客户端pika, kombu来实现相关的功能。而调用这些客户端,则需要使用一种叫做callback(回调)的编程机制,回调本质上是一种进程间的通信机制,但是实现起来难度比较高。 其中回调的原理,简单说明如下:

9 2 the principle of callback

​ 图 9-2 回调的实现机制

从上面的callback()机制,我们可以看到,相关的机制实现的难度远远比单机的多进程(多线程的通信机制)要困难。如果在大数据的处理情景下,需要实现分布式的进程通信,这就需频繁的使用回调编程,这也是不小的设计难度。

那么我们有没有一种实现方案,既可以实现分布式的多机协作,又可以让应用程序Application Program 的编程变得非常的容易?非常幸运,在python的微服务的设计方案中,有一个很强大的中间件叫做celery。 而celery中的编程,核心是一种面向task的编程模式。

9.1.1 Celery中间件

9 3celery-basic

​ 图 9-3 celery 的编程模型

从上图可见,相关的celery的编程模型中,划分为三个部分,分别是:

1,Producer(celery client): 负责生产相关的task,并发送到一个broker中。

2,broker: 作为一个task的路由器,将task分发到不同的接收者。这里broker实现对rabbitmq的封装,屏蔽了rabbitmq复杂的调用细节。

3,Consumer(celery worker):接收到task以后,负责根据task的信息结构,执行相关的功能。

以上是celery的编程模型,在这种模型下,可以非常方便的实现一种分布式的编程方案。

9 4 celery intro

9.1.2Celery的task模型

9 5 celery task model

Celery 最核心的一个概念是task对象模型。

The basic building block in celery is a task。

To declare a task all you need to do is wrap a function with celery’s task decorator。 Now you are ready to run this function locally or remotely and add it to workflows that link together Other similarly wrapped functions。

在芹菜中,最基础的组件是task。

So here by calling delay on the create user task we are instructing celery to run this task remotely 。This means that celery will send a message to in exchange with instructions on which tasks to run and any args and kwargs pass to the function。

As we saw earlier this message will also contain a routing key which routes the message to the appropriate queue based on the exchanges type and its bindings. A celery worker watching a queue will then retrieve the message、 deserialize it、 run the function and store the result in a storage layer such as Redis 。

以下,是celery中缺省的queue模型。

9 6 celery default queue

9.1.3Celery 与kombu

Celery 与kombu是两个关系密切的组件,celery通过kombu来实现对rabbitmq的调用。

Kombu is a messaging library for Python.

The aim of Kombu is to make messaging in Python as easy as possible by providing an idiomatic high-level interface for the AMQ protocol, and also provide proven and tested solutions to common messaging problems.

AMQP is the Advanced Message Queuing Protocol, an open standard protocol for message orientation, queuing, routing, reliability and security, for which the RabbitMQ messaging server is the most popular implementation.

kombu的特点是支持多种的符合APMQ协议的消息队列系统。不仅支持原生的AMQP消息队列如RabbitMQ,还支持虚拟的消息队列如redis、mongodb、beantalk、couchdb、in-memory等。

kombu,接口更为简单,而且kombu它支持重新连接策略,连接池,故障转移策略等,且与celery的结合更为紧密。

所以,我们在本章中,将理解,更加简单的celery实现方案。

9.1.4RabbitMQ 方案简介

由于RabbitMQ是在金融系统中广泛使用的一款中间件,所以我们要对其中的基本结构做一个简单的介绍。

1, 生产者(producer)与消费者(consumer)

70 RabbitMQ Architecture

​ 图 7-2 RabbitMQ的基础架构

这里有两个重要的对象,分别是producer,和Consumer

Procucer:it send messages to the Exchange.

Consumer: it receive messages from the Queue.

2、队列(Queue)

71 Queue

​ 图 7-3 Queue模型

​ Queue 是存在于rabbit的broker中的对象,他会缓存消息,等待消费者来提取。他也遵循基本的先进先出(FIFO)的模式。

而Queue 作为一个对象,他有自己的属性。常用的属性包括:

73 Queue 的属性

1,name: 最常见的属性,说明Queue的名字。

2、Durable:durable meaning that it will persist if the broker restarts or crashes。 说明消息是否需要持久化。

3、Exclusive:是否是独占的Queue。 queue can only be used by one in only one connection。

4、Auto-Delete: once a queue has been used, it is destroyed once the last consumer disconnects from it.

3, 路由器 exchange:

exchange exchanges are objects that accept messages from producers and then route those messages to the appropriate Queues。

In RabbitMQ, it is important to note that producer do not directly send messages to Queues. Instead exchanges are responsible for routing messages to the appropriate Queues. producers then send those messages to exchanges.

Exchange是一个对象,他负责从生产者来接收消息,并把这些消息路由到合适的Queue中。请大家注意,在RabbitMQ中,生产者是不能把消息直接发送到Queue中。实际的情况是,需要透过Exchange来将消息路由到合适的Queue中。生成者仅仅需要将消息发送到Exchange中就对了。

那么exchange是如何连接到一个Queue呢, 那么还需要一个binding的操作。相关的内容后续会专案介绍。以上是关于RabbitMQ的介绍,后面再试验中会用到相关的概念。

9.2 基于celery的ETL方案

9.2.1 用例事件模型

下面,将使用celery方案的用例模型做一个简单的阐述

1、 getter扫描临时文件夹(Temp file directory),发现新到达的网点交易文件,将新到达的网点交易文件拷贝到处理文件夹中(handle file directory)。

2,getter将新到达的网点交易文件事件信息 写入到一个待处理队列hqueue中。

3,handler 负责监控hqueue中的新文件事件到达的情况,如果有新文件事件到达,则提取相关的文件的URL信息(文件的绝对路径)。

4,handler 对网点交易文件进行处理,生成结果文件,将结果文件拷贝到装载文件目录下(load file directory)下。

5,handler将新生成的网点交易结果文件写入到一个待装载队列lqueue中。

6, loader负责监控hqueue中的新文件事件到达的情况,如果有新文件事件到达,则提取相关的文件的URL信息(文件的绝对路径),处理新到达的结果文件,将结果文件拷贝到数据库中。

7, 用例执行完毕。

9.2.2设计约束

在本方案中,我们将使用celery作为分布方式队列的调度方案。

9.3设计模型

下面对相关方案的设计模型,做一个简单的介绍

9 7 object view of celery ETL

​ 图 9-4 celery_main 的组件模型

从上图可见,相关的celery_main 的组件模型,简单说明如下:

1, variables: 提供了和数据库相关的一些配置信息结构。

2、procfunc: 提供了具有celery特性的原子处理组件, 相关的组件可以基于celery的框架进行异步的调度处理。

3 celery config: 提供了相关的celery运行相关的一些配置。

9.3.1 Procfunc的改造方案

由于引入了celery方案,所以对于原子处理模块procfunc做一下相关的改造设计。

9 8 object view of procfuc

​ 图 9-5 Object view of procfuc

在相关的方案中,对procfunc做了改造。

1, 创建了一个实例app,他是由类celery()实例化而来。具备了类celery()的各种方法。

2,对原子操作getter(), handler(),loader() 进行了装饰器改造, 扩展了app.task() 方法,返回的装饰器对象的方法进行了扩展。

3,类 celery()的一个实例app提供了一个app.configformobject()的方法,这样可以实现对app的集中配置能力。

9.3.2 Celeryconfig 组件介绍

在上一章节中,我们介绍了celery()的一个实例app提供了一个app.configformobject()的方法,实现了对app的集中配置能力。

下面我们看看相关的配置方案。

9 9 task route

​ 图 9-6 celeryconfig组件

在相关的方案中:集中配置了各类对象。 这样应用起来就非常的方便。主要的配置对象:

1, Queue: 配置了用于分布式,通信的queue的信息。

2,task_dict: 配置了task相关的信息。

3,task(router):配置了task和queue之间的映射关系。

备注:这里task(router)是非常重要的组件,实现task和queue之间的配置管理。

9.3.3 Celery_main

借助celery强大的框架能力,主程序的代码就非常的简单:

from celery import chord, chain, group

def chain_test():

​ files = getter(watch_dir, handler_dir)

​ for f in files:

​ chain(handler.s(f, loader_dir), loader.s(pgtable, pginfo))()

if name == ’main‘:

chain_test()

相关的代码简单说明如下:

1,从celery包中,import了 组件chain()

2,chain()实现立刻组件的异步处理。

3,chain()方法可以异步调用handler.s() 方法,和loader.s()方法。这里s代表的是signature方法,他可以实现异步的task调用,而书写起来非常的简单。

1.3.4 Chain()对象

9 10 chain

​ Object view of celery main

​ 图 9-7 celery main 的对象模型

下面我们看看,在celery_main的一个核心对象 chain()对象。

他是实现celery的异步调度的核心组件。

相关的语句简要说明如下:

def chain_test():

​ files = getter(watch_dir, handler_dir)

​ for f in files:

​ chain(handler.s(f, loader_dir), loader.s(pgtable, pginfo))()

在这里 chain()对象是通过两步生成的:

1,传参:chain(handler.s(f, loader_dir), loader.s(pgtable, pginfo)),通过为chain()传入两个方法,构建了一个对象chain()

2, 生成可调用对象: chain(kwarg)() 变成一个可以调用的对象。

相关的对象可以供主程序进行调用处理。

相关的时序图,简要说明如下:

9 11 chain的时序图

​ 图 9-8 chain()模式调用的时序图

从上图可见,相关的chain()模式的调用时序图,看看核心的操作时序。

1,对象chain 提取一个文件名f

2,对象chain启动一个任务调用, 将task 发送到handler queue中。(根据task和queue的映射关系来路由)。

3, handler.s() 从handler queue中,提取task,并完成任务处理。

4,对象chain启动第二个任务调用,将task 发送到loader queue中。(根据task和queue的映射关系来路由)。

5 、loader.s() 从loader queue中,提取task,并完成任务处理。

6,待两个任务均完成执行, 则chain()调用结束。

9.4 消息的传递

通过rabbitmq-management客户端,从celery使用的Queue中消费信息,其中较为关键的信息为:payload(实际传输消息体),其它为附加信息。

9.4.1 Queue:handler

message信息截图

9 queue handler

由上图可见celerytask调度时消息传递为(接收参数+其它信息),由于handler后chain链式调用loader可以看到“chain”信息不为null,并且包含了调度loader的一些附加信息。

9.4.2 Queue:loader

message信息截图

­­ 9 Queue loader

可见celerytask在queue中传递的消息payload信息结构一致;

在loader调度时由于后续没有chain链式调用,所以消息中“chain”信息为空。

9.4.3 小结

由于celery 强大的封装能力,我们不需要理解底层负责的配置关系,仅仅需要把Queue 和相关的调用方法配置好就可以了。

9.5 总结

现在我们演示了,如何通过celery的微服务方案。

基于celery的微服务,有以下的优点:

1、 无需理解复杂的rabbitmq方案,直接配置broker就可以。

2,通过装饰器的方法,在不修改原子函数的情况下,实现了对方法异步调度的能力。

3、关于celery的管理,采用集中配置的方案,易于理解。

虽然,限于篇幅的关系,没有深入的讲究celery的特性,相关的内容在后续的case中会进一步介绍。

Starter
MicroServ
Tutorials
Blog