Skip to main content

Python BigData Starter CP08 基于pandas生成报表

经过流转换处理以后,我们解决了storage层面的问题,接下来就考虑如何从data到information的问题。传统的手段是利用数据库的sql语句,不过在大数据的世界里,这种技术往往无法奏效,所以我们要考虑更加高效的方案。 而python刚好提供了很好的解决方案,利用pandas强大的ETL功能,可以在内存中实现强大的数据聚合功能。

8 基于pandas的数据报表生成

​ 经过了前面流转换部分的练习,我们已经掌握了如何的将数据从多个外部数据源汇聚了中心机房,并缓存在一个目标文件夹下。 从某种意义上说,实现了storage的功能,虽然还是非常初级程度的数据持久化管理,然而已经为后续更高层级的数据处理打下了坚实的基础。 相关数据处理的层次详见下图:

8-1 数据应用的层次

​ 图8-1数据应用的层次

​ 从第三章到第七章,全部介绍的是流对象转换的问题,也就是如何将现实世界的对象,通过一系列的转换程序,采集到IT系统的数据持久化层,完成storage的目标。

​ 虽然这些练习看起来非常的基础,但是实际上是整个ETL处理中最为复杂,最为艰困的部分,因为这一步的处理,一方面要和操作系统有比较多的协作,同时要兼顾容错性的需求,最后实现数据的归一化处理。

​ 而在后面处理的部分,虽然涉及到的概念比较复杂,但是因为是在归一化的环境进行处理,所以容错性处理的要求相对比较简单。所以这一部分的学习会相对比较轻松。

​ 以上三大模块的训练,读者就可以掌握,如何实现在大数据的应用场景下,实现从storage, data,information 三个层面的数据应用能力。

8.1 问题背景分析

当一个营业网点的多张ticket清单记录回传到中心机房以后,经过解压缩转换,XML2CSV转换。多个CSV文件合并以后,就形成了一张归一化的CSV格式大交易的清单。相关的处理过程见下图所显示:

8-2 数据应用归一化用例集合

​ 图 8-2 数据归一化处理的用例集合

经过前面繁琐而复杂的处理过程,我们终于很开心的获得了一个很规范的CSV文集。

​ 前面介绍,经过流转换模块的四大用例, 我们已经可以收集到一个营业网点的在15分钟的交易清单列表, 那么作为银行总部,就希望在第一时间掌握以下的资讯。

1,一个营业网点在15分钟内交易的次数,和交易的金额数量、

2,一个特定用户在15分钟内交易的次数,和交易的金额数量。

3,上述两个报表,在每个小时和每天的统计数据。

在这个章节中,我们将学习如何利用python来制作报表。

8.1.1 用例回顾-报表生成。

​ 经过流转换模块以后,我们已经获得了1个营业网点在15分钟粒度的交易信息,那么我们就需要生成相关的统计报表。相关的用例简要说明如下。

1,timer 每15分钟,扫描新生成的交易清单文件(CSV格式)。

2,timer 首先以交易网点,和交易类型为关键字,进行统计,对15分钟粒度类发送的交易次数,交易金额进行累加,生成网点粒度的统计报表。

3,timer继续以客户id ,和交易类型为关键字,进行统计,针对15分钟粒度发生交易次数,交易金额进行累加,生成客户维度的统计报表。

4,timer将两个报表进行持久化处理。

8.1.2 输入数据 (I)

​ 从上图可见,在本阶段,我们希望能够对海量的数据进行处理,形成我们需要汇总数据。而结合上一个章节介绍的用例故事,可以提炼用例中包含的领域对象模型。

8-3 报表输出数据

​ 表8-3 输入数据

​ 结合上面的领域对象模型,我们可以发现虽然原始的交易数据比较多,但是经过提炼处理以后,可以非常方便的掌握每个营业网点,和每个客户的总体交易信息。这就是从数据到信息的价值。

输入的数据样例为

img

从文件处理的角度,输入的对象为

[input filename; [output file1, output file 2]]

举例如下:

[QLR201902071000-merged.csv; [QLR201902071000-re-site.csv, QLR201902071000-re-customer.csv]]

说明如下:

1, 等待处理的输入数据文件,QLR201902071000-merged.csv(此处为绝对路径。

2, 期望获得的输出文件数据,包含两个文件

QLR201902071000-re-site.csv 站点级别汇聚的数据。

QLR201902071000-re-customer.csv 用户级别汇聚的数据

(此处为绝对路径)

8.1.3处理过程 (P)

处理过程,大概说明如下

1, 根据输入数据CSV文件,生成一个dataframe1

2, 读取报表配置文件,尝试生成第一个报表。

3, 根据配置文件, 创建1个新字段,用于表达时点信息。

4, 读取配置信息,进行分组聚合操作。

5, 分组聚合操作以后,生生成1个新的dataframe2。

6, dataframe2的字段根据配置进行转换。

7, 将dataframe2 的数据进行输出CSV。

8, 处理完以后,反馈1个结果(当前报表处理的结果)。

9, 根据配置文件,处理所有其他的报表。

8.1.4 输出数据(O)

输出数据的表样见下图:

8-4 报表输出模型

​ 图 8-4 报表生成的处理模型

文件处理完毕以后的,返回值为:

{ QLR201902071000-re-site.csv; [是否生成成功, 文件大小]

QLR201902071000-re-customer.csv;[是否生成成功, 文件大小]}

8.2 组件方案选型

​ 从本章开始,我们将接触大数据处理中的更高的层级,也就是从data 到information的level。 所以要选择一款合适的的中间件,而幸运的是python中提供了强大的数据ETL插件pandas,基于pandas你可以非常方便的对数据进行各种各样的预处理。

​ 使用pandas有以下主要的优势。

1, 完全在内存中进行运算,效率比较高。

2,媲美SQL的强大数据处理能力,同时支持可编程能力。

3,利用一些插件可以快速扩展为多进程,以及多机并行集群计算。

8.2.1 Pandas简介

Pandas [1] 是python的一个数据分析包,最初由AQR Capital Management于2008年4月开发,并于2009年底开源出来,目前由专注于Python数据包开发的PyData开发team继续开发和维护,属于PyData项目的一部分。Pandas最初被作为金融数据分析工具而开发出来,因此,pandas为时间序列分析提供了很好的支持。 Pandas的名称来自于面板数据(panel data)和python数据分析(data analysis)。panel data是经济学中关于多维数据集的一个术语,在Pandas中也提供了panel的数据类型。

8.2.2 数据结构

Series:一维数组,与Numpy中的一维array类似。二者与Python基本的数据结构List也很相近。Series如今能保存不同种数据类型,字符串、boolean值、数字等都能保存在Series中。

Time- Series:以时间为索引的Series。

DataFrame:二维的表格型数据结构。很多功能与R中的data.frame类似。可以将DataFrame理解为Series的容器。

Panel :三维的数组,可以理解为DataFrame的容器。

8.2.3 apply()函数

参考 https://blog.csdn.net/qq_19528953/article/details/79348929

​ apply函数是pandas里面所有函数中自由度最高的函数。该函数如下:

DataFrame.apply(func, axis=0, broadcast=False, raw=False, reduce=None, args=(), **kwds)

该函数最有用的是第一个参数,这个参数是函数,相当于C/C++的函数指针。

这个函数需要自己实现,函数的传入参数根据axis来定,比如axis = 1,就会把一行数据作为Series的数据结构传入给自己实现的函数中,我们在函数中实现对Series不同属性之间的计算,返回一个结果,则apply函数会自动遍历每一行DataFrame的数据,最后将所有结果组合成一个Series数据结构并返回。

8.2.4 groupby() 函数

8-5 pandas分组模型

​ 8-5 pandas 的分组操作

groupby() 函数是一个常见的聚合函数,聚合的意涵是根据根据特定的字段,或者字段的组合进行分组,分组以后进一步针对其他的期望聚合字段进行一些函数操作。

​ 从上图可见, 标色的列是用于分组的字段,经过分组以后,将期望聚合的字段进行聚合处理,这样就生成了新的字段(聚合结果字段),分组操作的核心是将行记录进行合并操作。

图8-6 分组-聚合操作的意涵

​ 图8-6 分组-聚合操作的意涵

从上图可见,分组-聚合操作的意涵是从1个数据框,转换为另外1个数据框。也就是上图所显示的,从DF1转换为DF2。

大致的处理过程为:

1、选择一个源数据框DF1, 从中筛选出分组字段 add_field, 分组

字段可能是1个,也可能是多个。

2、接下来从源数据框DF1,中筛选出聚合字段agg_fields集合, 同时针对每个agg_field确定操作列表,也就是说每个agg_field 可以配置1个或者N个聚合操作。

3,聚合函数groupby func()首先根据分组字段 add_field,将源数据框DF1分成若干个组,假设是N组。

4,聚合函数接下来选取1组数据,假设有M行。 提取其中的1个聚合字段, 将同1列的M个字段进行聚合操作,生出一个聚合字段合并结果, (假如这个聚合字段还有其他的聚合操作,继续生成其他聚合算法的聚合字段结果);依次处理其他的聚合字段。 这样就生成了1行聚合结果。

5,针对后续分组的数据分别进行计算,一共生成了N行数据, 这N行数据就构成了DF2.

从上面的操作过程,我们可以了解分组操作的可以降低源DF1的数据规模,得到一些更加小规模的目标数据DF2,而这些DF2的数据可以更加宏观的得到一些趋势的判断。 这个处理过程,就是从data到information的转换过程。

8.2.5 Series对象

​ Series是Pandas中的一维数据结构,类似于Python中的列表和Numpy中的Ndarray,不同之处在于:Series是一维的,能存储不同类型的数据,有一组索引与元素对应。下图的代码是Series的一个简单示例。

图8-7  Series简单示例

​ 图8-7 Series简单示例

与Numpy类似,Pandas的引入也使用语句“import pandas as pd”,pd作为Pandas的别名是通用写法;

第3行创建了一个名为s1的series对象,本例中传入的是一个包含4个整型数值的列表,传入元组、字典都是可以的,如果传入字典,字典的键key和值value将自动转换成series对象的索引和元素;

由s1的打印结果可知:series对象不仅包含数值,还包含一组索引,由于在创建series对象时没有指定索引,因此会默认使用非负整数当做索引。

​ 在series对象中,索引与元素之间是一种映射关系,元素在series对象中的有序存储是通过索引实现的,当传入字典创建series对象,可以通过指定索引的方式对series对象中的元素进行排序和过滤。

图8-8 字典和索引创建series对象

​ 图8-8 字典和索引创建series对象

对比字典scores和列表names我们发现:scores中的键顺序与names中元素顺序不一致,scores中没有的键为“tracy”的数据,names中没有“tom”;

第4行创建series对象s3时以索引(列表names)为依据,names中没有的元素(tom)将被过滤掉,scores中没有tracy的分数,将会被NaN代替;

从最后的打印结果可以看出:s3是以索引names排序的,由于NaN是浮点型,因此s3中的数据类型自动转换成float64。

8.2.6 Dataframe 对象

​ 在pandas 中还有一个非常重要的DF对象,代表了一个二维的数据对象。其中关键的标识是

Columns: 行

Indexs: 列

图 8-9 Data Frame的基础模型

​ 图 8-9 Data Frame的基础模型

在常见的apply()操作中,我们可以观察到如下的参数:

df = df.apply(add_timefield, axis=**'columns'**) if add_fields else df

代表的就是逐行操作。

假如 axis=‘index’**则是逐列操作。**

8.3 需求renfine 与扩展

为了更全面的理解pandas的函数功能,我们将相关的报表需求进行适当扩展, 优化以后的报表见下图:

图 8-10 扩展的交易报表

​ 图 8-10 扩展的交易报表

从上图可见,经过汇总操作以后,主要需要生成三个字段,分别是:

1, 交易次数

2, 交易金额汇总

3, 交易金额均值(新增的字段)

8.4 基于配置文件生成报表

​ 基于pandas开发有一个好处,就是可以基于配置的方案来确定报表的生成,所以,我们来看看如何基于配置文件来生成报表。

8.4.1 新增字段-配置算法

在本案例中,会新增加时间点的字段,相关的增加字段的规则可以配置,在这里通过1个函数来实现。

def add_timefield(s):
"""recieve a row Series ,transform it, then return the Series"""
s['timescale'] = str(s['交易时间'])[:-2]
return s

这里s 是一个Series 可以看成是 “某一行”的 字典。经过上面的操作,可以将输入的交易时间,转换为15分钟粒度的“时点”数据,并增加1个元素。

备注:s 是Series , s[‘timescale’] = …. 是字典增加或修改值的一种方法。

8.4.2 算法字典 re_cfg {}

​ 由于本项目中要生成两个报表,所以将两个报表的生成算法进行配置。

以营业网点级别的报表为例:

​ ‘re-site’:{

​ ‘groupby_fields’:[‘timescale’,‘交易网点’,‘交易类型’], #可对add_fields中定义的字段进行groupby操作

​ ‘agg_fields’:{‘交易金额’:[‘sum’,‘count’]}, #可对单个字段增加不同运算方法

​ ‘map_fields’:{} #对最终输出字段重命名

​ },

有了配置文件以后,就可以很方便的按需生成报表。

8.5 设计模型

8.5.1 利用apply() 新增一个字段

前面讲过apply() 函数提供了很大的弹性,可以为DF中增加字段处理,下面看看如何添加时点数据。

df = df.apply(add_timefield, axis=‘columns’) if add_fields else df

在这里 处理函数 add_timefield 来自配置文件的def add_timefield(s)方法。

而参数axis=‘columns’的意义是逐行处理。

相关的处理过程是

1、从DF1中提取一行数据

2、调用add_timefield(s)方法。将行数据传入一个字典S中, 利用

s[‘timescale’] = str(s[‘交易时间’])[:-2] 针对字典中增加1个元素,这个元素和“交易时间”元素有关联性。 添加新元素以后,返回S,针对原DF的1行数据进行更新。

3、逐行处理完毕以后,处理结束,此时DF1中增加1列,字段名称是’timescale’。

8.5.2 聚合处理

核心的聚合代码如下:

#根据配置执行grouby和aggegration操作

df_agg=df.groupby(agg_cfg['groupby_fields']).agg(agg_cfg['agg_fields'])

#根据是否2层columns,看是降为1层

   df_agg.columns = ['_'.join(c) for c in df_agg.columns]

为了理解这段代码,我们见以下的例子

8-x 分组聚合演示

从上面的处理中,经过聚合操作以后,columns对象可能有两行数据。

此时经过聚合操作处理,变成1行,就可以实现了一个降维处理。

而降维的方法,还是使用的列表生成式。

for c in df_agg.columns 得到的( 这里是1个列表生出式)。

1.5.3 对象模型

8-11 基于pandas的报表生成器的对象模型

​ 8-11 基于pandas的报表生成器的对象模型

​ 从上图可见,利用pandas强大的插件功能, 我们仅仅利用df 和df_agg这两个对象,就方便的实现了报表的转换处理。 由于pandas提供了强大的抽象能力,基本上我们可以使用一种面向过程的方式来生产需要的报表。

​ 同时利用字典 re_cfg{} 强大的表达能力。我们也很方便的将多个报表的配置信息集中管理,后期可以按需生成报表。

Starter
MicroServ
Tutorials
Blog