Skip to main content

基于celery的高可用ETL方案

10 基于celery的高可用ETL方案

10.1 Celery方案总结

在上一章中,我们使用了celery做了一个ETL方案,通过celery我们实现了一个非常简化的进程间通信解决方案,虽然相关的方案比较重型, 但是具有强大的扩展性,可以非常轻松的实现跨机器的分布式消息通信。

​ 相关的celery的概念模型简要说明如下:

10 1celery overview

​ 图 10-1 celery的基本概念模型

从上图可见,相关的celery的设计模型中,包含了

1,Producer(celery client): 负责生产相关的task,并发送到一个broker中。

2,broker: 作为一个task的路由器,将task分发到不同的接收者。这里broker实现对rabbitmq的封装,屏蔽了rabbitmq复杂的调用细节。

3,Consumer(celery worker):接收到task以后,负责根据task的信息结构,执行相关的功能。

由于采用了归一化的结构,这样可以极大的简化相关的编程模型。实现一种分布式的微服务解决方案。

10.1.1 精简ETL方案存在的不足

以上是简化的ETL方案, 相关的组件图见下图:

10 2 ceery chain

​ 图 10-2 简化版本的ETL方案

从上图可见,相关的方案中,有celey中的chain对象(一个可调用对象)负责任务的异步调用, 这个方案是一个很简约,也很高效的方案。

然而在真实的ETL环境中,尚存在一些问题

1, 在本方案中,待处理的文件,是放置在1个内存中的list[] Files中, 那么如果需要处理的文件非常的多,那么这个list[]需要保持非常长的时间, 假如某种原因内存出现了崩溃,那么后续的程序就无法处理。

2,如果文件的处理,可能成功,也肯可能失败,我们如果文件处理失败,则需要记录这种失败的情况,在list[] 中记录,则会非常的麻烦。

3,如果需要处理的worker将分布在多台服务器上, 那么文件的处理状态更新,也会比较不方便。

4, 文件的处理,采用的是一种迭代的方案来处理,也就是说在list files中的文件是顺序执行的,假如需要处理的文件非常多,则需要考虑一种并行的处理方案。

10.1.2 高可用的ETL方案

针对这个原型方案的不足,我们要考虑一个更加完善的方案。主要的完善点在于:

1, 目前文件的信息是保存在内存中的list中,要考虑将他在内存中进行持久化管理,保持在数据库的一个表中。

2, 针对文件的处理机制,目前采用的是一个迭代器的方案进行调度处理,比较简单,所以要考虑设计一个调度器。

3, 如果文件的数量非常多,导致的文件处理周期很长的情况下,还需要一个周期性的调度器,来实现相关的调度管理。

相关的概念模型见下图:

10 3architecture celery

​ 图 10-3 高可用的ETL方案

从上图可见,以上是一个支持7×24小时运转的ETL 方案,透过这个方案,可以将ETL方案中的多个组件,拆分为微服务,多个不同的微服务通过celery+rabbitmq进行粘合。

10.2 高可用ETL方案的用例模型

10.2.1 用例模型

1、系统定时器timer 每分钟唤醒一次,发送一个采集的task到Queue中。

2、 getter接收到采集的task以后,启动采集工作。

3、 getter扫描临时文件夹(Temp file directory),发现新到达的网点交易文件(需要和已经采集的文件进行比较),将新到达的网点交易文件拷贝到处理文件夹中(handle file directory),并将拷贝的网点交易文件文件记录信息,传入到数据库中的filelist表中。

4,系统定时器timer 每分钟唤醒一次,发送一个任务调度的task到Queue中。

5,jobhandler负责扫描filelist表(待处理文件表)和jobtask(批处理任务表),判断上一批任务的处理情况,根据调度策略, 决定是否启动下一个批次的任务, 如果不满足条件,则等待下一个调度周期,如果满足条件,则启动一下个批次的任务, 发送一个文件处理的task到Queue中。

6, handler 接收到文件处理的task以后,启动文件处理的操作,生成结果集。

7,handler 将结果文件拷贝到装载文件目录下(load file directory),下,并发送一个数据装载的task到queue中。

8,handler 更新filelist表。

8,handler删除已经处理过的网元交易文件。

9、loader从queue中提取数据装载的任务

10, loader 负责处理新到达的结果文件,将结果文件拷贝到数据库中。

11,loader负责更新数据库中的filelist表,记录当前文件的处理完成时间。

12,loader 负责删除已经处理过的结果文件。

10.2.2 领域对象模型

10.2.2.1 Queue

10 4 b1 Task Queue

在本方案中,需要设计四个Queue:

1, getter queue: 用于定时传输启动getter 进行文件采集的task。

2、shcedule queue: 用于定时传输启动jobhandler进行任务调度的task。

3、handler queue: 用于异步传输,调度handler 进行文件处理的queue。

4、loader queue : 用于异步传输,调度loader进行文件装载的Queue。

10.2.2.2 Task

10 5 b2 task view

整个的ETL方案中,采用的是一种task 调度的方案,相关用于调度的task,总共有四个。

10.2.2.3 Filelist

10 6 b3 filelist

在前期的ETL方案中,待处理的文件信息,全部放置在内存中,而在本项目中,放置在数据库当中,这样获得了一种稳健性。

10.2.3 设计约束

在本章节的练习中,要求使用celery的两种模式

1,周期性调度模型 (celery beat)

2,异步调用模式(celery async)

10.3 设计模型

10.3.1 分析模型

首先我们来看看,在相关方案中的分析模型。相关的分析模型见下图所显示。

10 7 analysismodel

​ 10-4 分析模型

从上图可见,相关的分析模型中,我们需要包含组件有

1、beat组件:实现相关的周期性任务调度处理。

2、getter组件:负责实现文件的采集管理

3、schedule组件: 负责对待处理的文件进行分组调度管理,以合理的尺寸来实现文件的批处理。

4,chain()组件: 实现文件的解析和入库管理,而文件的解析和处理是异步的过程。

5、filelist:实现待处理文件的持久化管理,存储于关系型数据库(postgres-SQL)中。

虽然,这个分析模型看起来非常的简约,但是真是可谓是简约但是不简单。这个架构在国外的大数据工作早已广泛的进行使用。假如你想构建一个全国级别的银行数据中心项目,可以使用这个项目来实现相关的ETL处理。 也曾经有公司使用这个架构,实现股票数据的互联网搜集。

所以,深入研究这个架构,就可以非常低成本的构建一个数据中心。

10.3.2 组件模型

下面首先说明一下,组件模型的设计关系,组件模型的引用关系,见下表所显示:

​ 表10-1 商用级ETL组件的引用关系

10 8组件引用关系

以上,是相关商用级别的ETL组件的调用关系,相关的组件图,简要加以说明:

10 9商业ETL方案

​ 图 10-5 商业级ETL的组件

从上图可见,主要的几个组件做一个说明

1,task beat: 实现周期性的定时调度功能,这也是商业级的ETL中间件,实现7*24小时ETL数据采集能力。

2, procfuc: 主要提供了三大原子功能,getter,handler,loader,而考虑到商业级的应用场景,还增加了一些必要的日志,与数据入库的功能,将相关的功能扩展以后,形成了igetter,ihandler,iloader组件。

3,jobhandler: 实现了弹性的任务调度功能。这里的任务调度,主要是针对待处理的文档的调度功能。

下面将针对几个核心的组件进行重点的阐述。

10.4 Procfunc 组件

相比较于原型研究的ETL方案,商业级的ETL方案,需要进行比较多的功能完善,下面首先回顾一下,用于原型研究的单机版本的方案。

10.4.1 单进程ETL方案回顾

10 10 单进程ETL方案

​ 图 10-6 单进程的ETL方案

从上图可见,这是单进程的ETL方案。

从上面的对象图中,我们可以发现, 在这个ETL中,真正的原子是在于三个对象, getter,handler ,loader之间的协作。其中

1,getter和handler之间的协作,是通过内存中的一个list file, 他由对象getter 生成,有handler来消费。

2,handler和loader之间的协作,是通过内存中的一个信息结构lfile,他有对象handler生成,由loader来消费。

详细的时序图说明如下:

10 11 单进程ETL时序图

图 10-7 进程内调用的时序图

调用顺序:

\1. main进程调用getter方法;

\2. getter方法执行完成后;调用handler方法,传参为待处理文件路径;

\3. handler方法执行完成后;调用loader方法,传参为待入库文件路径;

\4. loader方法执行;

\5. main进程结束;

特点:同一进程内,方法间通过传参,顺序执行达到信息传递!

某一环节失败则整个程序关闭!

目前,在商业级别的ETL的方案中,主要的改进点在于:

1、对象之间的协作,是一种异步的调用关系,借助celery 框架来实现。

2,为了适配celery组件,所以的原子组件要进行扩展,实现异步的进程间调用管理。

3、待处理对象将从内存中,持久化到数据库中,这样将根据稳定。

10.4.2初始原子组件的入参

10 12 initial 原子组件

​ 图 10-8 初始原子组件

上图可见,这是前期在四章的第一个原型项目中,三个原子组件之间的协作处理。

​ 主函数只要合理的编排这三个原子组件,就可以实现顺序的组件处理。而在本原型项目中,初始的原子组件,并不需要进行修改。

10.4.3 Procfunc 设计思想概述

在前期版本的设计中,我们设计了三个原子功能,分布是getter,handler,loader, 每个原子组件专注于各自的ETL处理过程。 而在目前的版本中,为了实现商业版本的可用性,引入了数据库来实现部分关键信息的持久化管理。 所以getter,handler ,loader都必须通过功能扩展的方式,来实现一些处理事件的持久化管理。

目前由两种实现方式

A方案:直接在getter 等组件中,增加处理事件持久化的功能。

B方案:将getter组件进行扩展,升级为新的组件Igetter。

两种方法都是可行的,而使用第二种方法,有明显的好处。

1、初始的组件基本不改变。(个别组件,getter增加了文件重复采集的判断功能。

2,扩展部分的共性功能功能,比如数据库连接部分,可以分离出来,设计单独的类进行实现。

3、如果扩展性功能需要调整,仅仅需要在扩展的部分进行修改,不会影响其他的内容。

10.5表结构设计

在商业化的ETL方案中,为了保证应用的稳定性,很多在内存中领域对象模型,需要持久化到数据库中,所以在商业化ETL应用中,表结构设计是很重要的内容。

10.5.1 filelist表结构:

idfilenamedownloadinvoketask_begintask_enderror
serialvarchar(64)timestamptimestamptimestamptimestamptext

表字段说明:

§ id:自增ID,用于任务调度策略标识字段;

§ filename:文件存放在服务器的全路径;

§ download:文件下载时间;

§ invoke:该文件被调度时间;

§ task_begin: 文件实际处理时间;

§ task_end: 文件处理结束时间;

§ error: 记录处理过程中捕获的错误信息;

示例数据:

10 13 table filelist

10.5.2 jobtask表结构

idstartidendidchecktimesinvoke
serialintegerintegerintegertimestamp

表字段说明:

§ id:自增ID;

§ startid:文件调度开始标识,对应filelist中的id;

§ endid:文件调度结束表示,对应filelist中的id;

§ checktimes:等待当前批次完成,定时任务核查次数;

§ invoke:该批任务调度时间;

示例数据:

10 14 table jobtask

将相关数据进行持久化以后,我们就可以稳健的管理ETL组件。

10.6 pgutil功能说明

由于需要与数据库交互,所以基于psycopg2模块封装了一些对postgres数据库的操作,主要封装方法execute,executemany, fetchall。

§ execute(conn_text, query): 执行SQL语句,不处理返回结果;

§ executemany(conn_text, query, vars_list): 执行批量操作,主要用于批量插入;

§ fetchall(conn_text, query): 执行query语句,并获取全部执行结果;

10.6.1 psycopg2模块中主要类和方法

psycopg2模块中主要类connection和cursor部分关键方法属性图如下:

10 15 operation of pgconnector

官方文档:http://initd.org/psycopg/docs/

推荐阅读:https://blog.csdn.net/guofeng93/article/details/53994112

10.7 各组件与数据表交互

\1. igetter: 调用getter同时,记录(filename,invoke)信息到filelist表;

\2. schedule(jobhandler):查询filelist决定调度策略,记录invoke信息到filelist表;

\3. ihandler: 调用handler同时,记录task_begine信息,处理完成后记录;task_end信息,并在处理后删除文件;

\4. iloader:调用loader,并在入库后删除文件;

从上面的分析,我们可以发现各个处理组件都要和数据库的表

进行协作,而这些协作的方案,就是基于前面封装的pgutil组件。

10.7.1 组件关系

10 16 dynamic realtion of ETL

  1. beat任务定时调度igetter,schedule

  2. schedule (jobhandler)调度ihandler和iloader

10.7.2 beat组件

celery 组件最初设计的时候,主要是为了解决不同的组件之间,实时异步调用而设计的。 然而,随着celery的应用广泛,celery的功能进行了扩展。也可以支持定时的周期性调度,而周期性调度是在大数据处理中,非常重要的一个特性。也是需要重点掌握的内容。

​ 另外beat组件,无论是在大数据的后台调度部分,还是在数据中台部分,都有广泛的应用。

而在beat组件中,最核心的是时间策略的表达能力,相关的主要配置见下表:

10 17a2 celery beat

在传统的ETL方案中,如果要支持7×24小时的时间表达,往往有不少的困难,而celery的表达式则非常的强大。

10 18 a3 celery sample

下面以igetter 为例,说明相关的调度方案

10 19 a4 igetter beat

根据配置,采用的是每分钟的调度方案。

crontab(minute=’*/1’)

在这里,对 重新定义了 APP这个对象的相关函数。

10.7.3 组件扩展

在本方案中, 将getter, handler,loader 进行了扩展,实现了对非功能需求:即数据处理,文件操作等。 如果从组件集成的角度,可以理解为透过功能组成的方式实现功能的扩展。

10 20 function extension of ETL component

​ 图 10 功能扩展示例

从上面的设计中,我们可以发现,在一个商用的ETL方案中,要合理的将功能性需求和非功能需求做一个分离。其中功能性需求是从用户层面需要的功能,也是用户愿意付费的支撑生产的部分。而非功能性需求,则是保证系统可用性的内容,虽然用户无法感知到这一部分需求的价值,然而这是用户愿意选择一个厂家开展合作的重要判断依据。

如何通过有限的成本,同时兼顾这两方面的需求呢,上面的软件工程实践,给出了一个最佳的范例。

10.7.4 igetter时序简介

10 21 sequence diagram of igetter

可以看到igetter负责与数据库的交互,getter负责下载任务。

10.7.5 igetterpipeline穿越

10 22 i getter pipeline ­­­­

10.7.6 ihandlerpipeline穿越

10 23 i handler pipeline

10.8 jobhandler设计

10.8.1 组件

jobhandler方法及关联组件见下图:

10 24 inter working of jobhandler

n fetchid方法负责获取当前批次,每一组任务startid及endid

² 查询jobtask和filelist表,获取上一批次已发送的批次起始结束ID。

n trigger方法负责决定是否触发下一次任务调度

² 查询filelist表task_end、error字段,查看上一批次文件处理完成率。在多次查询完成率均未达到阈值情况下。直接查询queue中未处理消息量,决定是否触发下一次任务调度。(待完善)

n invoke负责调度任务,控制调度策略(例如:大包小包分离)

² invoke触发后,获取下一批次startid,endid并分组发送任务。

10.8.2 trigger策略

\1. 查询jobtask最一条数据,获取startid,endid(上一批的起始结束id);

\2. 查询filelist中startid<=id<=endid信息,获取taskend或error字段不为空的比例,为完成率;

\3. 如果完成率大于,设定阈值,触发invoke;

\4. 否则等待下一次触发;

\5. 多次尝试完成率均低于阈值(可能原因:docker stack重启,已发送任务被丢弃)。则重启发送最后一批任务,由于handler接受任务时有对已处理文件过滤,所以不会重复处理。

10.9​ celery task之间消息传递

在本方案中,规划了4个Queue, 相关的消息结构,简要说明如下。

10.9.1 Queue:getter

10 25  queue getter

10.9.2 Queue:schedule(jobhandler)

10 26 queue schedule

10.9.3 Queue:handler

10 28 queue handler

10.9.4 Queue:loader

10 29 queue loader

Starter
MicroServ
Tutorials
Blog