Skip to main content

基于单进程调用的ETL方案

4 基于单进程调用的ETL方案

在本项目中,我们计划采用一种基于函数调用的方案。在这个方

案中,我们假设所有的组件处理都是在一台计算机上,而且所有的处理组件在一个进程(process)中完成,所以我们有一种最基础的设计方案。

4.1 分析模型细化

1分析模型细化

​ 图 4-1 分析模型细化

从上图可见,相关的的处理过程中,主要的处理对象说明如下:

1、getter: 负责扫描temp file directory中新到达的文件夹,将文件拷贝到handle file directory中, 并在数据库的filelist表中批量记录新到达的文件清单。

2,handler: 负责从handle file directory中提取需要处理的文件,加载在内存中,提取其中的信息结构,并形成相关的结果集文件,并拷贝到load file directory中。

3, loader: 负责从load file directory 提取需要处理的文件,并将结果集文件拷贝到数据库中的bank ticket 表中。

4.2 设计模型简化

​ 由于本章侧重与原子服务的拆分,所以在本章节中,设计模型进行了适当的简化处理。我们采用单进程方案进行处理。

4.2.1 单进程内顺序调用

1数据处理pipeline

​ 图 4-2 数据处理的pipeline

调用顺序:

\1. main进程调用getter方法;

\2. getter方法执行完成后;调用handler方法,传参为待处理文件路径;

\3. handler方法执行完成后;调用loader方法,传参为待入库文件路径;

\4. loader方法执行;

\5. main进程结束;

4.2.2 组件模型

2component view of the process

​ 图 4-3 组件模型

从上图可见,主要有三个组件来实现相关的功能。

1,组件procfunc: 负责提供三个原子函数, getter,handler ,loader 实现文件扫描,文件处理,和文件装载。

2,组件variable:负责提供全局的配置功能,为各个应用组件提供环境变量的配置。

3,single_process main:负责启动单进程,顺序完成文件处理过程。

4.3 Process function

组件process function负责提供三个原子组件,实现getter,handler,loader的功能。下面对相关的原子组件的功能实现进行简单介绍。

4.3.1 Getter

Getter 负责进行文件的拷贝处理。

4.3.1.1 os.listdir

​ 由于getter需要对文件夹进行扫描,这里需要使用一个文件夹中文件遍历的函数os.listdir,相关其中的语法为

os.listdir(path=’.‘)

Return a list containing the names of the entries in the directory given by path. The list is in arbitrary order, and does not include the special entries ’.’ and ’..’ even if they are present in the directory.

os.listdir() 方法用于返回指定的文件夹包含的文件或文件夹的名字的列表。这个列表以文件名称的字母顺序排列,返回的列表 不包括 ’.’ 和’..’ 即使它在文件夹中存在。只支持在 Unix, Windows 下使用。

os.listdir

4.3.1.2内置函数zip()

在本模块中,还会使用一个python的内置函数zip()

根据手册,相关的含义简单说明如下:

zip(*iterables)

Make an iterator that aggregates elements from each of the iterables.

Returns an iterator of tuples, where the i-th tuple contains the i-th element from each of the argument sequences or iterables. The iterator stops when the shortest input iterable is exhausted. With a single iterable argument, it returns an iterator of 1-tuples. With no arguments, it returns an empty iterator.

内建函数zip()勇于创建一个迭代器,以聚合每个可迭代对象中的元素 。返回一个元组的迭代器,其中第i个元组包含每个参数序列或可迭代对象中的第i个元素。 当最短的可迭代输入耗尽时,迭代器将停止。 使用单个可迭代参数,它将返回1元组的迭代器。 没有参数,它将返回一个空的迭代器。

参考案例如下:

>>> x = [1, 2, 3]

>>> y = [4, 5, 6]

>>> zipped = zip(x, y)

>>> list(zipped)

[(1, 4), (2, 5), (3, 6)]

​4.3.1.3 Getter的对象模型

3 objectview getter

​ 图 4-4 getter 的对象模型

从上图可见,getter 的对象模型。 相关的处理过程简要说明如下:

1、 入参为(srcfiles,destfiles),是两个文件路径,一个是源文件路径,另外一个是目标文件路径。

2,首先是对源文件路径srcfiles进行扫描,检索其中的文件, 将结果传入一个列表files中。

3,将列表files中的每一个文件名分别和入参中的源文件路径,目标文件路径进行文件名拼接,生成两个列表,分布是srcfiles,其中记录了源文件列表;dstfiles,其中是目标文件列表。

4,利用内建函数zip()对两个列表srcfiles和dstfiles进行迭代租户,生成一个新的列表src_dst,其中记录了(源文件,目标文件)的拷贝关系。

5,根据列表src_dst,执行文件拷贝操作,将文件从源文件路径,拷贝到目标文件路径,而文件名称不变。

6,文件拷贝完毕以后,返回一个列表dstfiles,其中记录了在目标文件路径下的详细的文件列表。

备注:本实现中,进行了简化处理,暂时未处理filelist,而仅仅执行文件拷贝操作。

4.3.2Handler

相关的代码简单说明如下:

def handler(src, dstdir):

​ dst = os.path.join(dstdir, os.path.basename(src))

​ shutil.copyfile(src, dst)

​ time.sleep(2)

​ print(” 解析 %s done !”%src)

return dst

相关的处理过程,简要说明如下:

1、入参为 (src,dstdir),其中src是一个列表,其中包含了需要拷贝的源文件清单,而dstdir是目标文件路径。

2,根据 需要拷贝的文件清单,和目标文件路径,创建一个列表,其中包含了目标文件清单。

3,根据源文件清单,目标文件清单,执行拷贝操作。

4,返回一个列表目标文件清单dst(在第二步的处理中生成)。

4.3.3 Loader

Loader 负责将文件拷贝到postgres-sql数据库中,在本实验案例中,我们使用psycopg2作为数据库的连接器。

4.3.3.1 psycopg2简介

4 psycopg

Pscopg是在python编程中最流行的posgres-sql连接器,主要的函数说明如下:

1、psycopg2.connect(database=“testdb”, user=“postgres”, password=“cohondob”, host=“127.0.0.1”, port=“5432”)

  这个API打开一个连接到PostgreSQL数据库。如果成功打开数据库时,它返回一个连接对象。

2 、connection.cursor()

该程序创建一个游标对象将用于整个数据库使用Python编程。

3、 cursor.execute(sql [, optional parameters])

此例程执行SQL语句。可被参数化的SQL语句(即占位符,而不是SQL文字)。 psycopg2的模块支持占位符用%s标志。

4.3.3.2 loader的对象模型

5object view of loader

​ 图 4-5 object view of loader

从上图可见,相关的对象模型主要包含三部分

1, 文件对象: 根据入参中的filename, 读取相关的文件,转换为文件对象,并逐行读取相关的信息结构。

2、数据库连接器: 根据入参中的pginfo, 和相关的table参数,生成相关的数据库连接对象,和游标对象,实现数据的拷贝入库。

3,数据库系统: 实现将数据拷贝到postgres-sql的一个表中。

4.4 single_process main

我们通过一个主函数,就可以很轻松的实现相关的单进程的操作。

if name == ’main‘:

​ files = getter(watch_dir, handler_dir)

​ for f in files:

​ lfile = handler(f, loader_dir)

​ loader(pginfo, lfile, pgtable)

相关的处理过程,简要说明入下:

1,调用getter模块,将文件从watch_dir拷贝到handler_dir, 返回一个列表,其中包含了handler_dir中详细的文件信息。

2, 对列表中的文件进行遍历操作。

3,调用handle模块,将文件从handler_dir拷贝到load_dir; 返回一个对象是文件路径,他用于指示文件在load_dir中的路径。

4,调用loader模块,将lfile (待装载文件的决对路径信息) 拷贝到数据库中。

5,依次处理相关的文件,知道全部的文件处理完毕。

相关的对象模型,简单说明如下:

6 object view of sigle process

图 4-6 主函数的对象模型。

4.5对象之间的协作

从上面的对象图中,我们可以发现, 在这个ETL中,真正的难点是在于三个对象, getter,handler ,loader之间的协作。其中

1,getter和handler之间的协作,是通过内存中的一个list file, 他有对象getter 生成,有handler来消费。

2,handler和loader之间的协作,是通过内存中的一个信息结构lfile,他有对象handler生成,由loader来消费。

详细的时序图说明如下:

7 sequence diagram of main

图 4-7 进程内调用的时序图

调用顺序:

\1. main进程调用getter方法;

\2. getter方法执行完成后;调用handler方法,传参为待处理文件路径;

\3. handler方法执行完成后;调用loader方法,传参为待入库文件路径;

\4. loader方法执行;

\5. main进程结束;

4.6总结

在本章的练习中,我们通过单进程的方式,实现了几个对象之间的协同实现了一个简单的ETL的功能。

1,使用单进程的方式,程序的编写比较简单。

2,使用单进程的方式,不同的对象在进程的空间中进行信息交换,通过传参的方法来实现。

3,这种方案可以用例理解原理,但是不适合大型的分布式的项目。

Starter
MicroServ
Tutorials
Blog