Skip to main content

基于多进程的ETL方案

5 基于多进程的ETL方案

5.1 多进程ETL方案用例模型

为了专注于分布式微服务设计,我们对相关的ETL的用例进行模型进行了适当的简化,下面将相关的用例模型进行说明

5.1.1 用例事件模型

​ 下面将从对整个用例的事件序列进行描述:

1、 getter扫描临时文件夹(Temp file directory),发现新到达的网点交易文件,将新到达的网点交易文件拷贝到处理文件夹中(handle file directory)。

2,getter将新到达的网点交易文件事件信息 写入到一个待处理队列hqueue中。

3,handler 负责监控hqueue中的新文件事件到达的情况,如果有新文件事件到达,则提取相关的文件的URL信息(文件的绝对路径)。

4,handler 对网点交易文件进行处理,生成结果文件,将结果文件拷贝到装载文件目录下(load file directory)下。

5,handler将新生成的网点交易结果文件写入到一个待装载队列lqueue中。

6, loader负责监控hqueue中的新文件事件到达的情况,如果有新文件事件到达,则提取相关的文件的URL信息(文件的绝对路径),处理新到达的结果文件,将结果文件拷贝到数据库中。

7, 用例执行完毕。

5.1.2 实现方案约束

在本次练习题目中,我们要求使用多进程的方案,其中三个核心组件,getter,handler,loader分布在三个不同的进程之间, 三大核心组件不能直接调用,而必须通过queue来进行信息的交互。

相关的设计约束见下图所显示:

5 process communication

​ 图 5-1 多进程的进程间通信

5.2 单进程方案回顾

cp4 数据处理pipeline

调用顺序:

\1. main进程调用getter方法;

\2. getter方法执行完成后;调用handler方法,传参为待处理文件路径;

\3. handler方法执行完成后;调用loader方法,传参为待入库文件路径;

\4. loader方法执行;

\5. main进程结束;

在上一章中,我们使用单进程的方式实现了ETL的组件方案,这个方案中不同的组件之间是顺序调用的方式进行处理。这种实现方案的好处是开发比较简单,但是存在的不足也很明显:

1, 由于不同的组件之间是顺序调用的,这类似于一种阻塞的调用方案, 所以针对计算资源的利用效率比较低。

2, 如果处理的文件数量非常多的情况下,这种方案的处理效率比较低,无法在短时间内完成文件的处理。

3, 由于组件是顺序的执行关系,所以如果要添加功能,也不是很方便。

所以 ,我们要考虑一种更加高效率的设计方案。

5.3 多进程设计模型

52 multiprocess model

​ 图 5-2 多进程设计模型

调用顺序:

\1. main进程同时启动p_getter,p_handler,p_loader进程、线程;

\2.

Ø p_getter调用getter方法,getter处理完成后,p_getter将待处理文件路径,依次put至hqueue中;

Ø p_handelr从hqueue中获取一条待处理文件路径,并调用handler方法,handler处理完成后,p_handler将待入库文件路径put至lqueue中;p_handler从hqueue中取下一条消息继续处理;

Ø p_loadr从lqueue中获取一条待入库文件路径,并调用loader方法,loader完成后,p_loader从lqueue取下一条消息继续处理。

5.3.1 组件模型设计

1 process-multi

​ 图 5-3 组件模型

从上图可见,相关的组件模型简要说明如下:

1, 独立进程: 在本方案中,将getter,handler,loader中在不同的进程中启动。进程之间不直接通信。

2,本方案中,使用了两个queue。 实现getter与handler;以及handler和loader之间的异步消息通信。

5.3.2 多进程的空间

2 multi process space

​ 图 5-4 multi-process space

从上图可见,当程序开始运行的时候,总共划分了四个不同的进程空间,下面分别简述如下:

1,主进程空间: 其中包含了主要的调度运行程序。 本案例中使用的两个queue 也在这个空间中运行。

2、p_getter空间: 其中包含了对文件采集相关的进程空间。

3、p_handler空间:其中包含了对文件解析处理的进程空间。

4,p_loader空间:其中包含了对数据库进行装载的进程空间。

这里,主进程空间可以和其他三个进程空间通信,而其他三个进程空间之间从采用一种异步通信的模式,他们之间的通信必须要通过主进程空间来实现。

5.3.3 主程序简介

if name == ’main‘:

​ hqueue = Queue()

​ lqueue = Queue()

# process

​ p_getter_process = Process(target=p_getter, args=(watch_dir,handler_dir,hqueue))

​ p_handler_process = Process(target=p_handler, args=(loader_dir,hqueue,lqueue))

​ p_loader_process = Process(target=p_loader, args=(pginfo, pgtable,lqueue))

​ p_getter_process.start()

​ p_handler_process.start()

​ p_loader_process.start()

​ p_getter_process.join()

​ p_handler_process.join()

p_loader_process.join()

在上面的配置中,主要有两方面的配置

1,配置queue,在主进程中运行。

2,配置3个进程, 传入参数,并启动进程。

5.4 进程间通信

在多进程的情况下,可以选择利用queue来配置相关的进程间通信,相关的配置语句说明如下:

from multiprocessing import Process,Queue

注意,这里使用Queue是来自包multiprocess, 他采用一种类似socket的机制实现,跨进程的通信处理。

5.5 事件驱动的ETL

在第二章的讨论中,我们介绍过,ETL方案中的三大组件getter,handler,loader需要进行协作,在本章中,我们采用了一种事件驱动的方案。

在这种方案中,三个组件之间是一种松散耦合的关系,组件是通过从queue中接收事件的方式,来启动程序的运行。这种方案的好处有:

1、不同组件的耦合性比较低,这样为分布式部署留下了便利。

2,ETL方案的稳健性有比较大提升。个别组件的失败比较容易隔离。

3,对软件升级比较方便。

Starter
MicroServ
Tutorials
Blog