Skip to main content

Pywhio

官方 @ PywhiO. Python大数据, AI技术, 微服务, DRF+REACT前后端分离实践.

3 min read · November 24th 2019 (originally published at pywhio)

如何基于分段接收实现socket协议远程文件传输

流转换处理中,如果需要对流进行远程传输,socket是一种常用的机器间连接的协议,然而在文件传输时候,需要考虑文件的二进制传输,以及接受端的根据缓冲区大小实现对文件的分段接收的处理。

这里我们透过一个详细的案例,讲解如何通过socket实现文件的远程解压缩,这样可以帮助读者深入的理解socket的编程方式。

1. 远程文件解压

在前大数据时代,通常1台计算机,或者1台unix服务器就能够搞定巨大部分的信息化项目,所以项目的大部分部组件可以安装到1台服务器上,组件只需要在服务器内进行协同;但是到了大数据时代,因为数据的规模比较大,所以实施一个项目,往往需要多台机器实施项目,这就意味着项目的组件必须分布到不同的服务器上,组件之间必须跨服务器进行协同工作。虽然做事情的逻辑是一样的,然而跨机器处理的难度,远远大于在同一台机器的处理难度。虽然从单机处理,到多机处理, 从程序处理的角度是迈出了一小步,而从大数据的角度来看,则是迈出了关键的第一步。

1.1 用例故事描述

blog2-1-usecase

图 1-1 远程文件解压服务
从上图可见,我们假设整个应用部署在两台服务器上,其中:
  1. 服务器1:上部署了FTP文件采集服务,采集到的压缩格式的文件放置在zip file warehouse下,解压缩以后的文件放置在unzip file warehouse下。

  2. 服务器2:提供了解压缩服务,他可以接受压缩文件,返回解压缩的文件,以及解压操作的结果。

1.1.1 输入数据处理(I)

输入数据包含以下的三个要素 [zipfile path, unzipfile path, if_unzip.cfg],下面将相关的要素进行说明:

  1. zipfile path:用于保存原始压缩文件的绝对路径。

  2. unzipfile path:用于保存完成解压文件的绝对路径。

  3. if_unzip.cfg:用于配置提供远程解压服务的服务器地址,和相关的配置信息。

本实验阶段,先考虑使用1台远程解压服务器。希望能够兼容多台远程解压服务器的模式。

1.1.2 处理过程(P)

  1. 应用程序读取zipfile path,扫描所有需要解压的文件(暂时不考虑下层文件夹的问题),建立待解压的文件列表。

  2. 应用程序提取1个压缩格式的文件,调用本地文件解压接口,将压缩文件传递到远程的server2.

  3. Server2 接收到压缩文件以后,对文件进行递归解压操作,生成解压文件流,并生成解压缩格式的文件。文件暂存在本地的临时文件夹中。

  4. 全部文件解压完毕以后,server 侧生成发送列表:{filename1:file object1, filename2:file object2, filename n,fileobject n}

  5. Server2 生成解压缩结果,相关的结果为dict形式,

{layer1_file.zip:[ 解压结果,number of next layer zipfile,number of next layer unzipfile ], layer2_file1.zip: [ 解压结果,number of next layer zipfile,number of next layer unzipfile ]}。

备注:number of next layer zipfile,如果能够解压成功,可以判断下一层的压缩文件数量,非压缩文件的数量。解压结果取值0(成功),1(失败)。

  1. Server2回调相关的本地接口,将server 侧的完成解压文件回 传给到server1的服务器。本地接口将文件写入unzipfile path。

  2. Server2回调相关的本地接口,将server 侧的完成结果回传

  3. server1 根据server2的处理结果,生成当前zip文件的解压结果。

  4. server1 继续处理其他的文件,直到全部的文件加压缩完毕。

1.1.3 输出结果(O)

输出结果包含两个部分

  1. 文件解压概述:

[执行结果,解压缩文件总数,成功处理子文件数量] ]

其中执行结果: 0 成功, 1 失败 ,2 部分失败

子文件数量:3

成功处理子文件的数量:3

  1. 单个文件解压详情:

{file1.zip:[解压结果,remote server1, starttime,endtime, number of failed unzip file, number of unzip file received],

File2.zip:[解压结果,remote server1, starttime,endtime, number of failed unzip file, number of unzip file received]}

备注: starttime :记录开始调用接口的时间。

Endtime:接收到完成解压的文件,以及解压结果以后的时间.

1.2 用例分析

blog2-1-usecase

对用例进行分析,我们可以将server2看做远程解压的服务的服务端,将server1看做远程解压服务的客户端;client首先通过文件解压接口,将压缩文件上传至server,server的文件解压服务对文件进行解压,解压完毕后将解压缩文件上传至client,单个文件远程解压缩用例执行结束。

所以,我们需要建立从client传输压缩文件到server和从server传输解压缩文件到client的通信过程。

1.3 主要组件介绍

1.3.1 Client

1.3.1.1 Client(object)

在本方案中,封装了一个对象,实现了对客户端连接远程操作的功能,主要的对象结构见下图:

blog2-2-client-object

导入socket模块,调用socket函数创建一个socket对象,调用connect()函数连接建立与服务端的连接,调用send()、recv()函数发送接收数据,根据这些函数封装了以下方法:

  1. client.upload传入压缩文件绝对路径,将压缩文件传输给server

  2. client.download接收server接收的解压缩文件

  3. client.handle负责调用upload和handle,发送并处理与server之间的请求

1.3.1.2 Client.handle()

blog2-3-client-handle

​ 图 1-2 handle()函数

从上图可见,相关的函数的调用顺序简单描述如下:

  1. 函数 handle()调用函数upload(),传入压缩文件绝对路径,将压缩文件传入server。

  2. 函数handle()调用函数 pickle.loads() 接收从server侧回传的信息,其中包含了待接收的文件数量。

  3. 函数func handle()调用根据list file_number的实例数量,迭代调用函数download(self),接收相关的文件。

  4. 全部处理完毕以后,函数handle()返回接收文件的数量,和文件接收操作的结果。

1.3.1.3 Upload(self,filepath)

blog2-4-client-upload

图1-3 Upload() 函数

从上图可见见,相关的上传操作分为两步操作法。

1、 文件信息上传

1.1 在内存中生成message_dict{};

1.2 调用client.send() 方法发送数据。

1.3 将接收的响应传入server_response.

2、 文件上传

2.1 读取文件,生成文件句柄,读取方式为binary。

2.2 利用文件对象f的read()方法,将文件转入内存中的中间流data中。

2.3 调用client.sendall(data) 方法,将中间流发送到远方。

2.4 将接收的响应传入upload_response.

1.3.1.4 Download(self)

blog2-5-client-download

相关的接收操作中, 相关的处理过程为两步处理法。

1接收文件信息。

1.1 发送一个200,声明准备开始接收文件信息。

1.2 从socket上接收到data( 是序列化传输的数据)

1.3 将接收到的data进行反序列化处理,反序列化的结果是一个dict

,即是message_dict{}

2、接收文件

2.1 根据message_dict{}信息,获取其中的文件的大小。(作为分片接收的判断标准)。

2.2 根据待接收文件的路径,生成一个文件句柄,其中文件的模式为binary。

2.3 生成一个中间流content,并生成一个变量receivesize,初始值为0.

2.4 开始启动文件接收,从soket上接收的二进制数据,放置在data中。

2.5 将data中的数据拷贝到content中,并将data的大小append到receivesize中。

2.6 持续进行后续的接收,直到receivesize的大小和接收的文件大小一致。

2.7 将内存中中间流content拷贝到file句柄中。

2.8 想server端回传相关的文件接收完毕的信息。

1.3.2 Server

1.3.2.1 FtpHandler(socketserver.BaseRequestHandler)

​ 本方案中,封装了一个对象FtpHandler(), 实现相关的服务器端的处理。

blog2-6-server

导入socketserver模块,创建一个处理请求类FtpHandler,实例化这个类创建服务器来接收处理请求

  1. server.uzip传入压缩文件绝对路径和解压缩文件存放的路径,对压缩文件进行解压

  2. server.FtpHandler处理请求的类,继承BaseRequestHandler,封装了upload、download和handle函数,功能与client功能一致

1.3.2.2 Server.handle()

blog2-7-serverhandle

在相关的处理中,主要是三步处理法

1, 从接口上接收压缩文件。

2、调用本地的解压缩文件。

3,将解压后的文件,迭代发送。

1.3.2.3 其他过程

这里,上传,下载的过程和客户端类似。

1.4 client-server 交互关系

1.4.1 解题思路

1.4.1.1 解题方案

blog2-8-serverinterwork

  1. 准备两个linux服务器server1和server2,在server1中创建client.py作为客户端,在server2中创建server.py作为服务端

​ 2.在server.py中导入serversocket模块,然后创建处理请求的类FtpHandler,继承BaseRequestHandler类并重写父类的handle()方法,在handle中处理客户端的请求;然后实例化socketserver(TCPserver)类的对象,传入服务器地址和创建处理请求的类,最后调用serve_forever()接收客户端传过来的多个请求

  1. 在client.py中,导入socket模块,创建Client类,初始化socket对象,创建connect方法连接server,创建handle方法处理server的请求
  1. 建立从client传输压缩文件到server的通信过程,在client.py中,创建upload方法负责将压缩文件数据流传输至server,在server.py中创建download方法负责接收处理client传输过来的数据流
  1. 在upload函数中,传入文件的绝对路径,获取文件的文件和字节大小以字典形式存储(msg_dict),调用send方法将msg_dict发送给server,由于send方法只能传输字节类型数据,所以导入pickle模块,调用pickle.dumps()将msg_dict序列化再进行传输,在server.py中,在handle函数创建接收client消息的循环,如果接收的消息为空就中断循环,如果不为空就调用download函数接收client发送的消息,调用pickle.loads()将接收的消息反序列化为msg_dict
  1. 在upload函数中创建二进制读文件的句柄,将读取到的二进制数据调用sendall方法发送给server,在download函数中创建二进制写文件的句柄,调用recv方法接收client传输的二进制数据,由于recv方法接收的数据的大小有限(默认1024),所以创建接收数据的循环,当已接收的数据小于文件大小,就接收数据,当接收完数据就将所有二进制数据写入文件,client传输压缩文件到server的通信过程结束
  1. 在server.py中创建unzip函数对文件进行递归解压,在server.py的handle函数中,调用unzip函数对压缩文件进行解压,得到若干解压缩文件
  1. 构建server传输解压缩文件到client的通信过程,将解压缩文件传输到client,通信过程逻辑与client到server一样,只是变换了send和recv的角色,所以在client和server中再创建和调用upload和download函数即可
  1. 用tryexcept捕获函数执行异常,输出函数执行返回值

函数执行完毕客户端控制台输出截图:

blog2-9-clientprint

输出结果说明:输出结果以字典形式展现,key为压缩文件名,value为列表形式的解压缩结果输出,0代表解压成功,1代表解压失败,其余分别代表解压起始时间,解压成功后的子文件数量或解压失败抛出的错误

函数执行完毕服务端控制台输出截图:

blog2-10-serverprint

1.4.1.2 流转换穿越

数据转换传输流程图

blog2-11-decompresspipeline

​ 图2 数据转换流程图

  1. 获取压缩文件的文件名和文件字节大小,以字典形式存储(msg_dict)

  2. client将msg_dict序列化成二进制数据发送给接口

  3. server接收数据,将数据反序列化得到msg_dict

  4. server根据msg_dict创建写文件的句柄

  5. server发送‘200 ok’通知client可以进行发送文件

  6. client接收到‘200 ok’

  7. client对压缩文件以二进制形式进行只读

  8. client将读取的数据发送给server

  9. server接收到数据

  10. server将接收到的数据写入压缩文件

  11. unzip函数对压缩文件解压得到解压缩文件

  12. server将解压缩文件传给client,交换client和server角色,重复上述步骤。

1.5 关键设计考虑点

1.5.1 流转换分段接收

1.5.1.1 分段接收问题概述

​ 由于socket接收进程在接受文件流的时候,缓冲区的大小有限制,必须采取分段接收的策略,这对服务处理会代理额外的逻辑,如果处理的不妥,会导致TCP粘包的问题。

1.5.1.2 TCP粘包

TCP粘包

产生粘包的情况有两种

  1. 由于产生粘包的原因是接收方的无边界接收,因此发送端可以在发送数据之前向接收端告知发送内容的大小即可。在本次用例中由于一次性发送文件读数据太大,所有在进行文件传输之前发送一个msg_dict携带文件大小参数作为请求报头传输给服务器,服务器根据大小设置循环接收数据的次数。

    2.当连续发送数据时,由于tcp协议的算法,会将较小的内容拼接成大内容,一次性发送到服务器端,因此造成粘包。在本次用例中,因为发送的msg_dict内容较小,client又要发送文件的二进制数据,所以会造成粘包,所以在server接收到client的msg_dict之后,让client接收服务端发送的‘200 ok’来避免连续发送造成的粘包情况。

3.由于recv方法每次接受的数据大小有限,在将接收的数据写入文件的时候,不要在循环中写入,在循环结束后消息全部消息接收完毕对数据统一进行写入

1.5.1.3 流传输client-server互操作时序

blog2-12-decompress-stream

时序图说明:

客户端通过socket创建的实例对象来发送接收请求,服务端通过socketserver创建的实例对象来发送接收请求。

  1. 客户端发送msg_dict到服务端,服务端接收msg_dict

client.send(pickle.dumps(msg_dic))

pickle.loads(request.recv(1024).strip())

  1. 服务端发送‘200 ok’到客户端,客户端接收‘200 ok’

request.send(‘200 ok’.encode(‘utf-8’))

client.recv(1024)decode(‘utf-8’)

  1. 客户端发送文件的二进制读取数据,服务端接收接二进制读取数据

    client.sendall(zipfile.read)

​ request.recv(1024)

  1. 客户端服务端交换角色,重复上述发送接收步骤

1.5.2 二进制steam传输

1.5.2.1 远程调用的二进制传输问题

​ 在本案例中,需要远程传输字典和文件,在远程传输的情况下,必须采用二进制的方案,否则会产生错误。

1.5.2.2 解决思路

blog2-13-decompress-binary

​ 图4:client-server之间的互操作

由于数据在传输过程,数据格式只能是二进制格式,所以msg_dict首先被pickle.dumps()函数转化为二级制数据进行传输,然后pickle.loads()对数据进行解析得到msg_dict,zipfile以二进制方式进行读取再传输,然后再以二进制方式写入。

1.5.2.3 Pickle序列化

​ 在本方案中,需要传输文件信息给对端,相关的字典信息,是基于pickle()方案传输。

python的标准模块,实现了基本数据的序列化与反序列化

序列化操作:

pickle.dump()将序列化后的对象obj以二进制形式写入文件file中,进行保存,注意要以二进制形式写入

pickle.dumps()与dump的唯一却是不需要写入文件,直接返回一个序列化对象

反序列化操作

pickle.load()将序列化的对象从文件file中读取出来。

pickle.loads()直接从bytes对象中读取序列化信息

1.5.3 Socket server

socketserver

blog2-13-decompress-socketserver

  1. socketserver是对socket的再封装,实现了多并发,它包含四个基本的服务器类,常用的有TCPserver和UDPserver两种类型,以及两个不常用的UnixStreamServer和UnixDatagramServer;类之间的继承关系为TCPServer继承基础类,UDPServer和UnixStreamServer继承TCPServer,UnixDatagramServer继承自UDPServer,后两个不常用的类类似于tcp、udp类,但是使用的是Unix域套接字,在非linux平台不可使用。
  1. 这四个类同步处理请求;必须先完成每个请求,然后才能开始下一个请求。如果每个请求都需要很长时间才能完成,这是不合适的,因为它需要大量的计算,或者因为它返回了很多客户端处理缓慢的数据。解决方案是创建一个单独的进程或线程来处理每个请求。the ForkingMixInThreadingMixInmix-in类可用于支持异步行为。
  1. 创建一个socket服务器,首先创建一个请求处理类,继承BaseRequestHandler类并重写父类的handle()方法,重写的这个方法会处理传入的请求,然后实例化一个服务器类(TCPserver或UDPserver等),将服务器地址和创建的请求处理类这两个实例化参数出入,然后调用handle_request()处理一个请求、serve_forever()处理多个请求,最后调用serve_close()来关闭socket连接

4.BaseRequestHandler这是所有请求处理程序对象的超类,它定义了接口,具体处理程序的子类必须定义一个新的handle()方法,该方法必须完成服务器请求的所有工作,它具有一个实例化属性,sel.request作为请求对象。

  1. BaseServer是模块中所有Server对象的超类,它封装了一下方法:

handle_request()处理单个请求

serve_forever() 处理请求,知道明确停止处理位置shutdown()

shutdown() 告诉serve_forever()停止循环

serve_close() 停止服务器

Previous

如何利用celery的canvas组件实现分布式微服务整合
Starter
MicroServ
Tutorials
Blog