Skip to main content

基于kombu的ETL中间件

8 基于kombu的ETL中间件

8.1基于kombu的ETL中间件用例模型

相关的用例模型和第七章的用例模型一样。

设计约束: 使用kombu作为相关的客户端,连接rabbitmq,实现消息的发送,接收。

8.2 核心设计方案

8 kombu based component view

​ 8-1 kombu based 的ETL 方案

在这个方案中,还是使用了进程池的方案,激活了三个进程,进程之间,是通过queue进行通信的。

8.2.1 客户端设计

简化对rabbitmq,我们使用kombu来实现客户端的连接。相关的设计方案,说明如下:

8 1 pika based client

​ 8-2 客户端方案

在本方案中,使用kombu作为客户端的方案。

通过kombu实现produce:

​ self.channel.Producer().publish(

​ body=msg.encode(“utf-8”),

​ retry=True,

​ exchange=self.exchange,

​ routing_key=self.routing_key,

​ declare = [self.queue]

​ )

消费消息,等其它操作,同样也是根据接口,进行封装。

8.3 回调call_back设计

由于客户端有可能会连续的从Queue中接收到消息,所以这里我们要使用一种回调的处理机制。

8.3.1回调的原理

下面看看标准的回调的解释:

Call back: In computer programming, a callback is a piece of executable code that is passed as an argument to other code, which is expected to call back (execute) the argument at some convenient time. The invocation may be immediate as in a synchronous callback, or it might happen at a later time as in an asynchronous callback.

简单的回调模型图,示例如下:

8 3 the principle of call back

​ 8-3 回调的原理简介绍

§ 主程序调用consume函数,传入可执行对象;

§ consume函数内逐条消费数据msg,并调用可执行对象;

本示例中不同进程中,传递不同的callback函数示例图:

8.3.2 应用上下文分析

​ 我们一个进程空间p_handler进行讲解。

1,p_handler 是一个进程空间,他由一个client_instance2 组成;

2,client_instance2 是一个由pika实现的一个客户端可以连接到rabbit-mq,实现消息的生产和消费。

3,client_instance2还包含了一个handler的函数,可以实现相关的功能。

4,client_instance2连接到一个queue hq,从中消费消息,每接收到一个消息,就回调函数handler,实现文件的处理。

5,client_instance2连接到一个queue lq,生产的消息将发送到lq中。

从这个应用的上下文来看, 我们要设计一个handler的进程,他包含rabbit mq连接的客户端,以及具体的handle的功能。

我们看看相关的配置:

pool = multiprocessing.Pool(processes = 3)

​ pool.apply_async(p_handler, (loader_dir,client_kwargs,hq_kwargs,lq_kwargs))

在这里,我们发现进程p_handler 有一个入参是client_kwargs,他其实是一段代码,(client(objent)的一个实例);

这一段代码将会成为进程p_handler的组成部分。

这一段代码负责监听hq_kwargs中是否带处理的消息? 如果有消息到达,就要调用handler的方法。

在这里不是P_handler来调用handler方法,而是由client(objent)的一个实例来负责调用,这个就是我们说的回调。

8.3.3 在消费消息时候的回调详解释

8 4 call back while consume message

​ 8-4 如何在消费消息时候进行回调

8 5 callback to invoke loader

8 6 timing of call back function

下面将相关的回调方法做一个简单讲解

1、定义一个进程 p_loader(),其中有5个入参,loader_dir,client_kwargs,hq_kwargs,lq_kwargs。

2, 初始化一个client的实例,其中的入参为client_kwargs

3,定义换一个回调接口,callback(), 负责消费消息。

4,在回调接口中,如果监听到消息,则传入到一个变量f。

5,在回调接口中,会调用一个loader()方法,相关loader()的参数来自与进程p_loader() 的5个入参之中。 (这是闭包的概念)

6,激活client的client.cosume() 方法,这个方法会一直持续的运行。

7,将callback()代码传入client.consume()中执行,有client.consume()来决定callback()的执行时机。

8,当client.consume()监听到消息的时候,会启动相关的callback()方法

8.4 Generator 方案

In computer science, a generator is a special routine that can be used to control the iteration behaviour of a loop. In fact, all generators are iterators. A generator is very similar to a function that returns an array, in that a generator has parameters, can be called, and generates a sequence of values. However, instead of building an array containing all the values and returning them all at once, a generator yields the values one at a time, which requires less memory and allows the caller to…

在该案例中,使用生成器可以达到相同的效果:

8 7 genetrator function of the message consume

1、主程序调用consume生成器函数;

2、Consume消费msg,生成msg,并暂停;

3、主程序接收msg,并调用消息处理函数;

4、msg处理完成后,consume恢复执行;

5、循环上述操作;

备注:kombu不支持生成器方案,pika可以支持。

8.5 总结

\1. pika和kombu两个模块都提供了通过Python程序,操作rabbitmq的接口;

\2. pika及支持rabbitmq的操作;

\3. kombu的特点是支持多种的符合APMQ协议的消息队列系统。不仅支持原生的AMQP消息队列如RabbitMQ,还支持虚拟的消息队列如redis、mongodb、beantalk、couchdb、in-memory等;

\4. kombu,接口更为简单,而且kombu它支持重新连接策略,连接池,故障转移策略等,且与celery的结合更为紧密;

另外,在我们引入第三方的框架,回调是一种合适的方案:

8 8 PPT of call back

Starter
MicroServ
Tutorials
Blog