Skip to main content

基于 pika的ETL方案

7 基于 pika的ETL方案

7.1 多进程方案回顾与讨论

在前面的两个章节中,我们使用了Queue的方案实现了一种异步调度的ETL方案,其中多线程方案中,使用的是python的内嵌的queue组件,而多进程方案中,使用的是python的多进程方案中的queue组件。

在这两种方案中,queue都是一直在内存中运行的, 假如ETL的是在以下的场景下运行:

1, ETL需要长时间的连续工作,可能超过了N×24小时的情况。

2、ETL需要处理海量的文件,这样在queue上需要传递大量的事件,可能1秒中传递的消息量会有上千条。

3,如果因为某种原因,queue出现了退出服务,希望能够消息不丢失。

在这种真实的大数据的ETL场景中,使用python原生的queue组件,或者是多进程的组件,实现起来有很大的难度,所以这时候,就要求使用专用的queue中间件。

7.1.1RabbitMQ中间件

RabbitMQ中间件是目前在大数据云平台中,最广泛使用的一种中间件,RabbitMQ主要是应用在银行,证券,金融领域, 作为分布方式的Queue消息中间件,而随着大数据云平台的广泛的推广,RabbitMQ日益成为一个必须的标准组件。

71 rabbitMQ in cloud

​ 图 7-1 rabbitMQ在云计算环境下

​ 为什么这么说呢? 我们知道,在面向对象的程序设计中,需要解决的一个核心问题,是对象之间的协作,也就是我们常常提起的对象之间的动态关系。所以在对象设计层面,我们要关注对象的操作(operation), 在组件设计的层面,我们要关注组件的接口(interface),

,而在系统集成层面,我们要关注服务(service)的发布,查找,调用等一些列的问题。

而通过引入rabbitMQ这样的分布方式消息(Queue)中间件,我们可以极大的简化服务之间的互操作设计,透过一种异步的服务调用模式来实现不同组件之间的协同。进而简化,大数据的组件设计。

7.1.2 Pika中间件

从上面的介绍中,我们了解到RabbitMQ是在大数据微服务应用中的一个核心组件, 但是在项目中直接使用RabbitMQ则是比较困难,因为RabbitMQ是一种高可用的分布式Queue组件,基于erlang编写。所以想自己实现一个客户端,还是非常艰困的。 好在,在python的世界中,总是有一些活雷锋,可以帮助我们打造好轮子。Pika就是这样一款用于连接rabbitmq的客户端,。

以下是官网对pika的介绍:

Pika is a pure-Python implementation of the AMQP 0-9-1 protocol that tries to stay fairly independent of the underlying network support library.

If you have not developed with Pika or RabbitMQ before, the Introduction to Pika documentation is a good place to get started.

在版本章节中,我们就计划使用相关的插件来实现一种分布方式的ETL方案。

7.2基于queue中间件的ETL方案

7.2.1用例事件模型

下面将从对整个用例的事件序列进行描述:

1、 getter扫描临时文件夹(Temp file directory),发现新到达的网点交易文件,将新到达的网点交易文件拷贝到处理文件夹中(handle file directory)。

2,getter将新到达的网点交易文件事件信息 写入到一个待处理队列hqueue中。

3,handler 负责监控hqueue中的新文件事件到达的情况,如果有新文件事件到达,则提取相关的文件的URL信息(文件的绝对路径)。

4,handler 对网点交易文件进行处理,生成结果文件,将结果文件拷贝到装载文件目录下(load file directory)下。

5,handler将新生成的网点交易结果文件写入到一个待装载队列lqueue中。

6, loader负责监控hqueue中的新文件事件到达的情况,如果有新文件事件到达,则提取相关的文件的URL信息(文件的绝对路径),处理新到达的结果文件,将结果文件拷贝到数据库中。

7, 用例执行完毕。

1.2.2 设计约束

在本案例中,要求使用的中间件

1、分布式队列中间件:rabbitMQ

2,python连接客户端:pika

7.3 分析模型

调用原理与单机上多进程、线程异步调用大致相同。主要的区别:

1、queue不再是单机上的进程内、线程内的共享python变量;

2、需要通过pika提供的操作接口,操作queue实体;

3、可在不同的机器上操作同一个queue实体(分布式);

基于消息中间件的消息传递图,如下:

7 2 pika client and queue

python与queue实体,连接接口说明:

 1、Broker Client负责连接消息中间件,并提供produce,consume方法;

 2、produce方法,向消息队列中发送消息;

 3、consume方法,从消息队列中获取消息;

特点:进程、线程间通过客户端,连接消息中间件,操作消息队列实体,进行¬¬信息共享!

7.4 设计模型

下面接下来,详细说明一下相关的组件模型。

7 3 pika based component view

​ 图 7-3 component view pika based ETL

从上图可见,相关的组件模型,主要包含3部分。

1,borker : 提供相关的连接rabbit-MQ的客户端组件。

2,variables:为支持broker运行,以及Queue的运行提供的全局化配置参数。

3,procfunction: 提供原子的ETL运行组件。

4,process_pool_pika: 提供主程序,调用相关的ETL原子组件。

7.4.1 Rabbit-MQ client 配置信息

在组件variables中,提供了关于rabbit MQ 客户端的配置代码,相关的配置说明如下:

​ client_kwargs = dict(

​ broker_url = ‘amqp://guest:guest@localhost:5672’,

​ exchange = ‘handler’,

​ exchange_durable = False,

​ exchange_type = ‘direct’

​ )

在相关的配置中,broker_url 配置了相关的rabbit-MQ的服务端连接信息。

7.4.2 Queue 的配置信息

在组件variables中,提供了本项目使用的两个Queue的信息,相关的配置说明如下:

1,用于getter和handler之间交换数据的Queue

hq_kwargs= dict(

​ queue_name = ‘handler’,

​ queue_durable = False,

​ routing_key = ‘handler’

​ )

2,用于handler和loader之间交换数据的Queue

lq_kwargs = dict(

​ queue_name = ‘loader’,

​ queue_durable = False,

​ routing_key = ‘loader’

​ )

7.4.3 Broker 的组件模型

由于连接rabbit-MQ中间件比较复杂,所以我们要专门封装一个客户端对象,实现对rabbit-MQ的连接。主要的方法见下图。

7 5 operation of broker client

7-5 broker client 的主要接口方法

  1. init方法中连接rabbit;

  2. 在consume消费数据;

  3. 在produce生产数据;

  4. close方法关闭连接;

透过以上的方法,用于程序可以调用客户端来使用rabbit-mq提供的底层功能。

7.4.4 基于pika实现的broker客户端

在本方案中,我们使用pika来实现相关的broker的客户端功能, 相关的规划见下图:

Broker模块中类关系图:

7 6 pika based client

​ 图7-6 pika实现的客户端

1 、 PikaClient,KombuClient实现了统一接口类AbstractClient

2 、 Client类实例化PikaClient,KombuClient,做对外接口;

7.4.5 Processpool_pika

最后我们看看,相关的process_pool_pika的实现方案:

7 7process_pool

​ 图 7-7 process_pool_pika

上图可见,相关的case中,主程序是通过进程池的方案,创建了3个进程空间。

我们一个进程空间p_handler进行讲解。

1,p_handler 是一个进程空间,他由一个client_instance2 组成;

2,client_instance2 是一个由pika实现的一个客户端可以连接到rabbit-mq,实现消息的生产和消费。

3,client_instance2还包含了一个handler的函数,可以实现相关的功能。

4,client_instance2连接到一个queue hq,从中消费消息,每接收到一个消息,就回调函数handler,实现文件的处理。

5,client_instance2连接到一个queue lq,生产的消息将发送到lq中。

7.5 总结

在本章的介绍中,我们通过pika实现了一个简单的ETL的功能。

1,通过pika,可以非常简化的使用rabbit-mq

2,通过pika+rabbitmq,可以非常简化的实现进程间通信的目标。

3,rabbitmq是专用的Queue中间件,所以运行更加稳定。

Starter
MicroServ
Tutorials
Blog