Skip to main content

利用微服务方案实现敏捷原型分析

1 2 利用微服务方案实现敏捷原型分析

在第10章中,我们介绍了一个商业级别的ETL方案,实现了周期性的数据采集处理。相关的模型见下图

12 1 dynamic realtion of ETL

​ 图12-1 商业级别的ETL方案

在这个方案中,我们主要实现了三方面的重要特性:

1、周期性调度: 基于celery beat 的强大特性,实现了灵活的调度管理。

2、持久化管理:主要的服务组件,添加了基于数据库的持久化管理功能,方便跟踪组件的服务状态。

3,异步调度: 不同的服务进程之间,通过queue的异步任务调度方案进行协同。

所以,对于需要7*24小时的ETL项目,是非常适合的。不过,相关的组件规模稍微显得比较庞大,且设计的环节,也比较复杂。读者可能会感觉,如此完善的ETL方案,是不是比较笨重呢? 显然,celery+rabbitmq这套方案,和孙悟空的金箍棒比较类似,经过设计者的巧手设计,剪裁为不同的数据ETL方案,尤其适合在敏捷开发的场合。

12.1 多种不同的软件中间件

​ 在11章我们曾经讨论过:是不是掌握了一套高度扩展的ETL中间件就万事大吉了呢,非也。 这不过是开始进入了商业级大数据应用的殿堂而已。 后续的学习和精进主要三个层面。

1、使用面向对象的软件工程方法(object oriented software engineering) 的方法,快速构建需求模型。

2、掌握一系列的软件和中间件,敏捷的将需求模型转换为实现模型。

3,软件系统上线以后,及时的进行优化资源的配置,提高硬件的利用率。

​ 在今天的案例中,我们将展现,如何使用多种不同的组件,来提升我们ETL方案的效率。

12.2需求模型

12.2.1 背景介绍

在实际的ETL项目中,我们需要把将数据源的数据进过处理以后,发送到不同的目的地去,打造多种不同的数据APP单元中,相关的背景见下图所显示:

12 2 multi output case

​ 图 12-1 多输出ETL方案

上图是一个典型的大数据人工智能的应用方案,这个方面中,主要的挑战有:

1、高效率的从多个不同的数据源中抽取数据。

2、考虑不同操作之间的依赖关系。

3、并行化的部署计算组件。

4,对每一个ETL task执行的结果 success/failure 事件都可以产生一个报告。

5、能够基于ETL数据处理的周期性要求,编排数据处理的多个方面。

12 x

1.2.2 用例事件描述

1, 用户将待测试的邮件拷贝到文件夹下。

2、用户启动ETL数据处理过程。

3、timer 首先发送一个email解析的任务,其中携带了email文件的路径,传送到相应的queue中。

4,解析器接从queue中提取这个task, 获得一封email,解析邮件内容,提取其中的信息结构,形成一个数据集。并返回一个字典对象。

5,timer 接收到邮件解析完毕以后的字典对象以后, 同时发出两个task,一个task是将数据字典拷贝到关系型数据库中,另外一个task是将数据字典拷贝到搜索服务器ES中去。

6,关系型数据库装载器从queue中接收到这个任务,将数据拷贝到关系型数据库中。

7, ES装载器从queue中接收到对应的任务,将数据拷贝到ElasticSearch服务器中。

8,三个任务均执行完毕以后,一封邮件处理完毕。

9,全部邮件处理完毕以后,用例执行结束。

12.2.3 领域对象模型

12.2.3.1 邮件对象

12 2 domain object modelof email

12.2.3.2 Queue

12 3 task queue

图 12-2 项目中的中间件

12.2.4 项目中的中间件

12 4 component

以上是本次原型练习中需要安装的中间件。

12.3 分析模型

12 5 analysis mode of email

​ 图 12-3 项目中的分析模型

从上图可见,有两个核心的组件

1、 worker: 负责具体的处理email数据

2、Queue :用于传递task

12.4 设计模型

12 5 gloal component view

​ 图 12-4 设计模型

从上图可见,整个方案中,包含了四大部分

1,fabric: 负责启动这个处理任务。

2,worker: 负责处理相关的任务处理。

3, broker: 负责对task message进行路由和分发。

4、backend: 负责记录相关的执行结果。

12.4.1Fabric

本项目中,使用Fabric组件,实现相关的ETL启动处理。

The component fabric is used to start the celery APP

For example

:fab workers:start (will work as follows):

​ local(“mkdir -p celery-pids celery-logs”)

​ local(“celery multi {} parse db_deploy es_deploy celery “\

​ “-Q:parse parse -Q:db_deploy db_deploy -Q:es_deploy es_deploy -Q:celery celery “\

​ “-c 2 -c:celery 1 “\

​ “-l info -A proj “\

​ “—pidfile=celery-pids/%n.pid —logfile=celery-logs/%n.log”.format(action))

Note:

1、the mkdir command is used to create the log directory!

2、The Celery command is used to start the celery APP。

12.4.2 MessagesTask

We use the object proj/tasks.py to define the Messages Tasks, we have 3 tasks in our application. 下面看看相关的task的定义:

12.4.2.1 Email parse

1、to parse the email, and produce the messeg_dict in the memory:

@app.task(base=MessagesTask, queue=“parse”)

def parse(filename):

​ """Parse an email file. Return as dictionary"""

# Call the method in the base task and return the result

return parse.parse_message_file(filename)

12.4.2.2 Email persistance

2、to copy messeg_dict into the database

@app.task(base=MessagesTask, queue=“db_deploy”, ignore_result=True)

def deploy_db(message_dict):

​ """Deploys the message dictionary to the MySQL database table"""

# Call the method in the base task

deploy_db.database_insert(message_dict)

3、To create the Elastic Search index according to the messeg_dict

@app.task(base=MessagesTask, queue=“es_deploy”, ignore_result=True)

def deploy_es(message_dict):

​ """Deploys the message dictionary to the Elastic Search instance"""

# Call the method in the base task

deploy_es.elasticsearch_index(message_dict[‘message_id’], message_dict)

12.4.3 任务调度顺序

相关任务调度的代码如下:

def process_one(filename=None):

​ """Enqueues a mail file for processing"""

​ res = chain(parse.s(filename), group(deploy_db.s(), deploy_es.s()))()

print “Enqueued mail file for processing: {} ({})“.format(filename, res)

note 1:the object chain and group are both celery component

note 2: form the above definition , we will first parse the email , and then make the action deploy_db and deploy_es

如上定义,先完成parse , 再同时完成deploy_db 和 deploy_es Simultaneously

12 6 sequence diagram

12.5 总结

以上是一个真实的ETL项目,通过快速原型的实现,我们可以发现:

1,celery框架可以非常方便的构建,ETL分析原型项目。

2,fabric 则可以很方便的进行服务编排。

3,celery提供了强大的服务编排能力,可以轻松的实现各种需要的服务编排。

Starter
MicroServ
Tutorials
Blog