HDFS WebHDFS 读写文件分析及HTTP Chunk Transfer Coding相关问题探究

文章目录

  • 前言
  • 需要回答的问题
    • DataNode端基于Netty的WebHDFS Service的实现
  • 基于重定向的文件写入流程
    • 写入一个大文件时WebHDFS和Hadoop Native的块分布差异
  • 基于重定向的数据读取流程
    • 尝试读取一个小文件
    • 尝试读取一个大文件
  • 读写过程中的Chunk Transfer-Encoding支持
    • 写文件使用Chunk Transfer-Encoding
    • 读文件使用Chunk Transfer-Encoding
      • Response Header中为什么没有Transfer-Encoding: chunked
      • 测试WebHDFS是否支持chunk Transfer-Encoding时的一个错误导致的错误判断
  • WebHDFS的特性和不足总结

前言

最近在调研使用WebHDFS进行HDFS文件读写的相关调查,因此需要对WebHDFS的整个读写流程进行探究,其中涉及到的必要的http重定向的整个流程必须搞清楚。
同时,由于HDFS涉及到大量的流式写和大文件读,因为我们比较关心WebHDFS对Chunked Transfer Coding的支持,我们对WebHDFS的这个特性进行了测试和代码层面的研究。

需要回答的问题

我们基于常规的hadoop-native 方式连接去读写文件的时候,会首先与NameNode通信,然后再与DataNode通信读取具体数据,这其中的处理逻辑都放在了hadoop-native中的具体客户端代码中。
但是一旦使用WebHDFS,我们所有的通信方式仅仅是HTTP。hadoop-native中的这种多次通信逻辑放在一个http请求中该怎么处理呢?所以,我们在测试WebHDFS以前,就首先有了疑问: 基于HTTP的WebHDFS该怎么处理先到NameNode然后到DataNode的整个过程?总不会让数据从DataNode、经过NameNode再到客户端吧?

DataNode端基于Netty的WebHDFS Service的实现

在DataNode启动的时候,会启动一个DatanodeHttpServer,用来启动对应的WebHDFS Service:

  private void startInfoServer()throws IOException {// SecureDataNodeStarter will bind the privileged port to the channel if// the DN is started by JSVC, pass it along.ServerSocketChannel httpServerChannel = secureResources != null ?secureResources.getHttpServerChannel() : null;httpServer = new DatanodeHttpServer(getConf(), this, httpServerChannel);httpServer.start();

DataNodeHttpServer 构造的时候,实际上构造了基于Netty的DataNode Http Service。我们特别注意到,在initChannel()方法中,添加了对Chunked Transfer Encoding的支持handler ChunkedWriteHandler。后面我们会具体讲这个类在我们通过WebHDFS读取文件时的作用。

      this.httpServer = new ServerBootstrap().group(bossGroup, workerGroup).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();p.addLast(new HttpRequestDecoder(),new HttpResponseEncoder());if (restCsrfPreventionFilter != null) {p.addLast(new RestCsrfPreventionFilterHandler(restCsrfPreventionFilter));}p.addLast(new ChunkedWriteHandler(), //支持chunk的数据写入new URLDispatcher(jettyAddr, conf, confForCreate));}});

上面的代码其实是创建一个Netty服务器端的基本流程(以下基本流程来自ChatGPT的回答,个人认为非常准确,因此直接粘贴):

  • 创建 EventLoopGroupEventLoopGroup 是一个处理 I/O 操作的线程池。在服务器端,我们通常需要创建两个 EventLoopGroup,一个用于接受客户端的连接(Boss Group),另一个用于处理连接的数据流(Worker Group)。

    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    
  • 创建 ServerBootstrapServerBootstrap 是用于启动 Netty 服务器的引导类。它配置了服务器的各种参数,如线程模型、通道类型、处理器等。

    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyChannelInitializer()); // 设置处理连接的 ChannelInitializer
    
  • 设置 ChannelInitializerChannelInitializer 是一个特殊的处理器,用于配置新连接的 ChannelPipeline。在这里,你可以添加自定义的处理器来处理连接的数据流。在上面的HDFS代码中,创建了一个匿名的ChannelInitializer实现,重写了initChannel()方法:

    this.httpServer = new ServerBootstrap().group(bossGroup, workerGroup).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();p.addLast(new HttpRequestDecoder(),new HttpResponseEncoder());.....}
    
  • 实现自定义的处理器: 在 ChannelInitializerinitChannel()方法中,添加Netty集成的或者自定义的处理器ChannelHandler,用于处理连接的数据流。这些处理器会成为一个处理器链,挂载在ChannelPipeline中。很显然,这个Pipeline下面的Handler时按照addLast()添加的顺序有序的,比如WebHDFS在下面的代码中,意味着读数据的时候,数据先经过ChunkedWriteHandler,然后经过URLDispatcher,而向客户端写数据的时候,数据流先经过URLDispatcher,然后经过ChunkedWriteHandler

      p.addLast(new ChunkedWriteHandler(), //支持chunk的数据写入new URLDispatcher(jettyAddr, conf, confForCreate));
    
  • 绑定端口并启动服务器: 最后,将服务器绑定到指定的端口,并启动服务器。

    ChannelFuture f = httpServer.bind(infoAddr);try {f.syncUninterruptibly();......
    

在DataNode端, 用户可以通过dfs.datanode.http.address配置了对应的WebHDFS地址,默认是0.0.0.0:9864

  public static final String  DFS_DATANODE_HTTP_ADDRESS_KEY = "dfs.datanode.http.address";public static final int     DFS_DATANODE_HTTP_DEFAULT_PORT = 9864;public static final String  DFS_DATANODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_HTTP_DEFAULT_PORT;

基于重定向的文件写入流程

写入一个大文件时WebHDFS和Hadoop Native的块分布差异

我们通过基础的java.net提供的HttpURLConnection创建Http客户端向HDFS写入文件。

通过命令, 可以看到其实这个文件的第一个Block都是在同一个机器上。
hdfs -copyFromLocal这种基于hadoop native的api拷贝文件,这个文件的block是逐个申请的,NameNode会为每一个Block根据当前的BlockPlacementPolicy分配对应的replica位置,但是,基于WebHDFS的NameNode只能在请求写文件的时候和NameNode进行一次通信,返回一个Location信息供客户端进行Redirect的写操作,因此,这导致这个文件的所有Block的第一个Replica都在这台机器上。至于每一个Block剩下的Replica的复制,按照HDFS的块写入策略,都是由第一个Replica复制到剩余两个Replica上去的。

我们做测试的时候,通过WebHDFS上传一个300MB的文件到HDFS。之所以选择300MB,是因为HDFS的块大小是128MB,因此300MB的文件需要创建3个Block,从而观察3个Block的第一个 Replica 是否是同一台机器。
下面是使用Java的HTTP客户端基于WebHDFS向HDFS写入一个大概300MB的文件的代码:

public class WebHDFSWriteFileExample
{public static void write_file(){try {// WebHDFS endpointString webHdfsEndpointProd = "http://dddddd-101.prod.corp.com:50070";// HDFS path where the file will be writtenString hdfsPath = "/user/hadoop/hello-world/330MB.txt";// Local file path to be uploadedString localFilePath = "/Users/cwu/330MB.txt";// Hadoop usernameString username = "hadoop";// WebHDFS REST API URL for file creation (PUT method)String webHdfsUrl = webHdfsEndpointProd + "/webhdfs/v1" + hdfsPath + "?op=CREATE&user.name=" + username + "&overwrite=true";// Open the local file for readingFileInputStream fileInputStream = new FileInputStream(localFilePath);BufferedReader reader = new BufferedReader(new InputStreamReader(fileInputStream));// Create HTTP connectionHttpURLConnection connection = createConnectionWithURL(webHdfsUrl);// Write file content to HDFSOutputStream outputStream = connection.getOutputStream();String inputLine;while ((inputLine = reader.readLine()) != null) {outputStream.write(inputLine.getBytes());outputStream.write("\r\n".getBytes());}outputStream.write("0\r\n\r\n".getBytes());// Close the connectionconnection.disconnect();}}// 基于WebHDFS的url创建一个HttpURLConnectionprivate static HttpURLConnection createConnectionWithURL(String webHdfsUrl) throws IOException {URL url = new URL(webHdfsUrl);HttpURLConnection connection = (HttpURLConnection) url.openConnection();connection.setDoOutput(true);connection.setRequestMethod("PUT");return connection;}}

文件上传以后,我们通过fsck命令查看上传的文件的块分布情况,结果验证了我们的猜想:文件的所有Block的第一个replica都是机器81.2.20.332,显然,这个机器就是我们向NameNode发送请求的时候NameNode返回给我们的307重定向的DataNode地址。重定向地址只有一个,因此,所有的块的写入就只能写到一个机器上去。

root@dddddd-101:~# HADOOP_USER_NAME=hdfs hdfs fsck hdfs://drdencrypted/user/hadoop/hello-world/330MB.txt -files -locations -blocks
Connecting to namenode via http://dddddd-101.prod.corp.com:50070/fsck?ugi=hdfs&files=1&locations=1&blocks=1&path=%2Fuser%2Fhadoop%2Fhello-world%2F330MB.txt
FSCK started by hdfs (auth:SIMPLE) from /81.2.2.108 for path /user/hadoop/hello-world/330MB.txt at Wed Jan 10 14:49:14 UTC 2024
/user/hadoop/hello-world/330MB.txt 348093376 bytes, 3 block(s):  OK
0. BP-332385978-81.2.1.108-1654488140099:blk_5650223226_4580715954 len=134217728 Live_repl=3 [DatanodeInfoWithStorage[81.2.20.332:50010,DS-fa64ea42-f4e8-4ebb-999e-1727fcfb879a,DISK], DatanodeInfoWithStorage[81.2.12.132:50010,DS-88228db5-1c5b-4715-8b36-2bb653201367,DISK], DatanodeInfoWithStorage[81.2.11.125:50010,DS-e6936497-52f8-495e-9d80-0d7d5684369a,DISK]]
1. BP-332385978-81.2.1.108-1654488140099:blk_5650225022_4580717750 len=134217728 Live_repl=3 [DatanodeInfoWithStorage[81.2.20.332:50010,DS-c0cb79b2-7679-4961-8bb4-917224b248ed,DISK], DatanodeInfoWithStorage[81.2.8.131:50010,DS-368a6917-1a36-419f-a6bc-7129978d0926,DISK], DatanodeInfoWithStorage[81.2.8.132:50010,DS-56b945ff-004f-4af5-879e-2941db14693e,DISK]]
2. BP-332385978-81.2.1.108-1654488140099:blk_5650228459_4580721187 len=79657920 Live_repl=3 [DatanodeInfoWithStorage[81.2.20.332:50010,DS-0bf35d03-c89c-4e9a-86ce-af9c1a149b52,DISK], DatanodeInfoWithStorage[81.2.11.126:50010,DS-74da8386-1369-4562-bfec-ae6350b48fbd,DISK], DatanodeInfoWithStorage[81.2.12.132:50010,DS-9bcd388e-a129-4f5c-99b3-6611e59f5a12,DISK]]

因此,通过WebHDFS写文件其实是有一个隐含的重定向过程,但是读者看上面的代码,并没有看到任何重定向的处理逻辑,这是因为,在默认情况下,HttpURLConnection帮我们自动处理了重定向,即,如果服务器端返回307状态码,那么客户端自动重新建立到header中的Location中到连接,然后一样的请求发送给redirect地址,这一切对用户隐藏。
如果我们的确想观察重定向的详细过程,完全可以通过connection.setInstanceFollowRedirects(false);关闭客户端的自动重定向,手动处理重定向。同时,后面会讲,假如我们enable了Chunked Transfer Coding,自动重定向处理也将被关闭。
没有了自动重定向,就需要手动创建两次http请求,第一次请求发往NameNode,NameNode返回307,并附带了指向某台DataNode的Location,然后我们再依据Location第二次建立连接,将文件写入到对应的DataNode。因此基于上面的WebHDFSWriteFileExample,我们修改代码进行手动重定向以观察重定向细节:

public class WebHDFSWriteFileExample
{public static void write_file(){try {....... 发送请求到NameNodeif (responseCode == HttpURLConnection.HTTP_CREATED) {System.out.println("File created successfully on HDFS.");} else { // 发现了重定向System.out.println("Error creating file on HDFS. Response Code: " + responseCode);String newUrl = connection.getHeaderField("Location"); //获取header中的Location,即重定向的目标地址connection.disconnect(); // 关闭原先的对NameNode的连接,开始创建针对DataNode的连接HttpURLConnection connection2 = createConnectionWithURL(newUrl);// Write file content to HDFSoutputStream = connection2.getOutputStream();fileInputStream = new FileInputStream(localFilePath);reader = new BufferedReader(new InputStreamReader(fileInputStream));while ((inputLine = reader.readLine()) != null) {outputStream.write(inputLine.getBytes());}}}private static HttpURLConnection createConnectionWithURL(String webHdfsUrl) throws IOException {.....connection.setInstanceFollowRedirects(false); //disable掉自动重定向......
}

基于重定向的数据读取流程

我们将分别测试和研究读取小文件(不足一个Block)和大文件(两个或者更多Block)的读取流程,二者理论上会产生区别,因为假如文件来自两个或者更多的Block,基于WebHDFS的重定向似乎就没那么简单了。
下文来讲解具体的读取过程。

尝试读取一个小文件

对于只占用一个Block的小文件,我们发送读取请求给NameNode,NameNode会返回一个307的重定向,这个重定向一定是指向这个唯一的Block的其中一个Replica所在的DataNode的。
比如,读取一个1MB的小文件的代码如下:

public class WebHDFSReadFileExample {private static void read(String[] args){try {// WebHDFS endpointString webHdfsEndpointProd = "http://dddddd-101.prod.corp.com:50070";// HDFS path where the file will be writtenString hdfsPath = "/user/hadoop/hello-world/1MB.txt";// Local file path to be uploadedString localFilePath = "/Users/cwu/1MB.txt";// HDFS path to the file// Hadoop usernameString username = "hadoop";// WebHDFS REST API URL for file reading (GET method)String webHdfsUrl = webHdfsEndpoint + "/webhdfs/v1" + hdfsPath + "?op=OPEN&user.name=" + username;// Open the connectionHttpURLConnection connection = createConnectionWithURL(webHdfsUrl, false, "GET", false);if (connection.getResponseCode() == HttpURLConnection.HTTP_OK) {System.out.println("File created successfully on HDFS.");} else if (connection.getHeaderField("Location") != null){ // 307 redirect// 创建http连接,基于get请求读文件,关闭自动重定向,不enable chunkHttpURLConnection connection2 = createConnectionWithURL(newUrl, false, "GET", false);// Write file content to HDFSBufferedReader reader = new BufferedReader(new InputStreamReader(connection2.getInputStream()));// read the file from BufferReader.........}....}// 创建链接,用户可以选择是否打开自动重定向,是否使用chunkprivate static HttpURLConnection createConnectionWithURL(String webHdfsUrl, Boolean autoRedirect, String method, Boolean enable_chunk) throws IOException {URL url = new URL(webHdfsUrl);HttpURLConnection connection = (HttpURLConnection) url.openConnection();connection.setInstanceFollowRedirects(autoRedirect); // 关闭自动重定向connection.setRequestMethod(method);connection.setDoOutput(true);if(enable_chunk)connection.setChunkedStreamingMode(-1); //选择是否打开chunked streaming out。后面会说,该选项决定的是客户端的写,基于GET读取文件的时候,该选项不可以打开return connection;}

读取一个小文件的过程在预期之内,HttpClient首先与NameNode 建立Http连接, NameNode响应了307状态码同时在Response Header中包含了目标location的地址信息。在我们关闭了自动重定向的情况下,建立了与Location中指示的URL建立连接,然后读取到对应的文件。

尝试读取一个大文件

我们已经知道读取文件的时候,NameNode会返回一个重定向地址给我们,告诉我们去联系对应的DataNode读取对应数据。但是,假如这个文件不止一个block,那么,是不是在读取下一个block的时候DataNode也会返回下一个DataNode的地址给我们接着进行下一个Block的读取呢?

我们创建了一个200MB的文件,显然,这个文件会有两个Block,并且,我们从fsck可以看到,这个文件的两个块的总共6个副本,分布在6台不同的机器上。

root@rccp103-2d:/var/log/hadoop-hdfs# HADOOP_USER_NAME=hdfs hdfs fsck hdfs://rccp103-2d.iad5.prod.corp.com:8020/user/hadoop/hello-world/200MB.log -files -locations -blocks
Connecting to namenode via http://rccp103-2d.iad5.prod.corp.com:50070/fsck?ugi=hdfs&files=1&locations=1&blocks=1&path=%2Fuser%2Fhadoop%2Fhello-world%2Fhadoop-cmf-hdfs-NAMENODE-rccp103-2d.iad5.prod.corp.com.log.out.1
FSCK started by hdfs (auth:SIMPLE) from /71.9.2.108 for path /user/hadoop/hello-world/hadoop-cmf-hdfs-NAMENODE-rccp103-2d.iad5.prod.corp.com.log.out.1 at Thu Jan 11 08:38:25 UTC 2024
/user/hadoop/hello-world/hadoop-cmf-hdfs-NAMENODE-rccp103-2d.iad5.prod.corp.com.log.out.1 209715453 bytes, 2 block(s):  OK
0. BP-332385978-71.9.1.108-1654488140099:blk_5655503893_4585996621 len=134217728 Live_repl=3 [DatanodeInfoWithStorage[71.9.9.131:50010,DS-a8904c39-cb1c-4fe0-b9e8-625f3bad85be,DISK], DatanodeInfoWithStorage[71.9.7.130:50010,DS-8e0d756e-bab4-4b44-b131-6bb9af01362b,DISK], DatanodeInfoWithStorage[71.9.8.128:50010,DS-622eb32d-e292-4824-818e-a72a12144fe3,DISK]]
1. BP-332385978-71.9.1.108-1654488140099:blk_5655503948_4585996676 len=75497725 Live_repl=3 [DatanodeInfoWithStorage[71.9.10.128:50010,DS-3e0bbada-d77c-46af-a1fa-573e54443706,DISK], DatanodeInfoWithStorage[71.9.12.129:50010,DS-7942484b-2b8e-438a-9d4b-8bf8d892c0cf,DISK], DatanodeInfoWithStorage[71.9.12.131:50010,DS-d2e9c177-4ea0-4232-9f28-f8a52644be8a,DISK]]

我们还是使用上面的WebHDFSReadFileExample代码,发现,读取文件的时候,NameNode返回了307重定向,并且指向了第一个块的某一个副本机器,文件的第一个和第二个块都是从这个机器上读取的,尽管,第二个块并不是分布在这台机器上

我们检查代码,发现了其实现的原理,简而言之,第一个Block的Replica的机器,充当了整个文件读取的中转,而不是仅仅负责自己存储的那个Replica的读取。
DataNode通过向Netty注册的WebHdfsHandler这个自定义的ChannelHandler来处理发往DataNode请求(上文讲过Netty的一些基本实体的定义)。WebHdfsHandler.onOpen()方法用来处理用户的读文件请求。我们从下面的onOpen()代码的实现细节可以看到,实际上,DataNode并不是将用户请求的文件在自己上面的Replica返回给Http客户端,而是将自己作为中转,创建一个Hadoop Client,读取文件,然后将读取到的文件的InputStream封装程一个ChunkedStream返回给客户端。所以,当客户端调用connection2.getInputStream()就能够拿到对应DataNode创建的读取文件的InputStream

 private void onOpen(ChannelHandlerContext ctx) throws IOException {// DataNode自己创建一个读取文件的Client,这是一个普通的Hadoop Native客户端,与普通的HDFS Java 客户端没有任何去呗final DFSClient dfsclient = newDfsClient(nnId, conf);HdfsDataInputStream in = dfsclient.createWrappedInputStream(dfsclient.open(path, bufferSize, true));in.seek(offset);data = in;ctx.write(resp);ctx.writeAndFlush(new ChunkedStream(data) { // 封装好InputStream,返回给客户端,可以看到,是以Chunk的方式。后面会讲,ChunkedStream会被当前的Handler中的下一个Handler即ChunkedWriteHandler进一步处理@Overridepublic void close() throws Exception {super.close();dfsclient.close();}}).addListener(ChannelFutureListener.CLOSE);}

读写过程中的Chunk Transfer-Encoding支持

Chunk Transfer-Encoding的支持会极大影响流式数据读写的性能。我们看看什么是Chunk Transfer-Encoding以及它能给我们带来什么:

分块传输编码(Chunk Transfer-Encoding)是一种 HTTP 传输机制,它将消息主体分割成一系列块,并使用长度前缀来标识每个块的大小。这种编码方式带来了一些好处:

  • 实时传输: 分块传输编码允许服务器在生成响应的同时将数据发送给客户端,而不必等到整个响应主体准备就绪。这对于实时生成的内容或流式传输非常有用。

  • 无需预先知道内容大小: 使用分块传输编码时,不需要预先知道整个消息主体的大小。这对于动态生成的内容或流式传输,其中内容的大小可能在生成过程中动态变化,非常有用。

  • 提高响应速度: 分块传输编码可以在数据准备的同时发送,减少了等待整个响应准备就绪的时间。这有助于提高响应速度,特别是对于大型或动态生成的内容。

  • 降低延迟: 分块传输编码允许客户端在接收到一部分数据时就开始处理,而不必等待整个响应完成。这有助于降低延迟,特别是对于实时应用或交互性应用。

  • 更好的适应网络波动: 在网络条件不稳定的情况下,分块传输编码可以更好地适应不同的带宽和延迟条件。由于可以在生成数据的同时发送,它能够更灵活地处理网络波动。

所以,总的说来,Chunk Transfer-Encoding主要用来文件大小未知的流式文件的传输,或者虽然确定大小但是文件大小很大的传输场景。

正常使用场景下,流式数据或者大文件的发送方会在http header中设置Transfer-Encoding: chunked,同时以chunk的方式发送数据,数据的接受方在接收到header并发现是Chunk Transfer-Encoding以后,会进行对应的编解码操作。当然,这个解码操作一般是对应的Library底层的自动操作,上层用户直接看到的是一个解码以后的InputStream比如BufferedInputStream等,而不需要关心本身的解码细节。

注意,Chunked Transfer Coding的每一个chunk的数据大小是自描述的,因此,并不要求在一个stream连接中所有chunk的大小都相同。只要遵循Chunked Transfer Coding的数据格式,比如,如果发送的文件是一个日志文件,数据的发送方甚至可以按照一行一个chunk进行数据的发送。

我们在下文详细讲解通过WebHDFS读写文件时对Chunk的支持。

写文件使用Chunk Transfer-Encoding

我们通过connection.setChunkedStreamingMode(chunklen),就会enable Chunked Transfer Coding写文件,即为我们添加Transfer-Encoding: chunked的http header, 同时设置了一个chunkLength,我们传入的参数如果是不大于0的参数,那么chunkLength就设置为默认值4096,否则就按照我们设置的大小设置chunkLength

与之相对,我们可以调用connection.setFixedLengthStreamingMode (int contentLength) 来选择使用Fixed-Length Streaming
Chunked StreamingFixed-Length Streaming是相互排斥的,二者只能选其一,这是因为二者的使用场景完全不同:

  • setChunkedStreamingMode: 用于指定请求体使用分块传输编码 (Chunked Transfer Encoding)。分块传输编码允许将请求体分割成多个块,每个块都带有长度信息。这样可以在数据生成的同时发送,而无需等到整个请求体准备就绪。使用这种模式可以提高效率,特别是对于较大的请求体。

    connection.setChunkedStreamingMode(0); // 使用分块传输编码,0 表示使用默认块大小
    
  • setFixedLengthStreamingMode: 用于指定请求体的固定长度。这意味着在实际写入请求体数据之前,需要知道请求体的总长度。这样的模式适用于较小的请求体,且长度可预知的情况。

    int contentLength = calculateContentLength(); // 计算请求体的长度
    connection.setFixedLengthStreamingMode(contentLength);
    

    选择使用哪个方法取决于请求体的大小和生成的时间。对于动态生成的或大小未知的请求体,通常更适合使用 setChunkedStreamingMode。对于已知大小的请求体,使用 setFixedLengthStreamingMode 可能更为合适。

下面的代码就是HttpURLConnection中两个方法的实现:

------------------------------------------HttpURLConnection------------------------------------------public void setChunkedStreamingMode (int chunklen) {.......if (fixedContentLength != -1 || fixedContentLengthLong != -1) {throw new IllegalStateException ("Fixed length streaming mode set");}chunkLength = chunklen <=0? DEFAULT_CHUNK_SIZE : chunklen;}public void setFixedLengthStreamingMode (int contentLength) {......fixedContentLength = contentLength;}

我们从HttpURLConnection.getOutputStream0()方法可以看到,

  • 由于我们之前设置了chunkLength,因此,这里会构建对应的ChunkedOutputStream,这意味着,我们通过HttpURLConnection.getOutputStream()返回的OutputStream进行数据的写入的时候,会经过ChunkedOutputStream然后才到最底层的比如SocketOutputStream,即ChunkedOutputStream会帮我们进行数据的chunk编码并发送给远程。
  • 如果是Http Method是Get,会强制变成Post,即我们不可能在get请求中使用Chunk Transfer Encoding,如果调用了setChunkedStreamingMode(chunklen),那么Http Method会被换成POST。这是为什么我们在读文件的时候假如调用setChunkedStreamingMode(chunklen),WebHDFS会返回一个400,让我们误以为WebHDFS不支持Chunk Transfer Coding。后文会详细讲解。
  • 我们从writeRequests可以看到,假如我们通过setChunkedStreamingMode(0)设置了chunkLength,那么写文件的请求中会被隐含加上Transfer-Encoding: chunked 的 Http header。而如果我们通过setFixedLengthStreamingMode()设置了使用fixed-length streaming,那么就会隐含加上Content-Length 的 header。
private synchronized OutputStream getOutputStream0() throws IOException {.........if (method.equals("GET")) { //在获取OutputStream,GET会被强制替换成POSTmethod = "POST"; // Backward compatibility}....if (streaming()) {if (strOutputStream == null) {if (chunkLength != -1) { /* chunked */strOutputStream = new StreamingOutputStream(new ChunkedOutputStream(ps, chunkLength), -1L); //封装一层ChunkedOutputStream} else { /* must be fixed content length */long length = 0L;if (fixedContentLengthLong != -1) {length = fixedContentLengthLong;} else if (fixedContentLength != -1) {length = fixedContentLength;}strOutputStream = new StreamingOutputStream(ps, length);}.....}private void writeRequests() throws IOException {.....if (this.streaming()) {if (this.chunkLength != -1) { // 设置了chunk, 会帮我们添加Transfer-Encoding:chunked的headerthis.requests.set("Transfer-Encoding", "chunked");var14 = true;} else if (this.fixedContentLengthLong != -1L) { //如果是fixed-length code,会帮我们添加Content-Length headerthis.requests.set("Content-Length", String.valueOf(this.fixedContentLengthLong));} else if (this.fixedContentLength != -1) {this.requests.set("Content-Length", String.valueOf(this.fixedContentLength));}}

下图显示了我们使用Chunked Transfer Coding写文件的时候的客户端观察到的HttpURLConnection的运行时的outputStream,从最上层 HttpURLConnection.StreamingOutStream,经过ChunkedOutputStream一直到最下层SocketOutputStream的层层封装过程。
在这里插入图片描述

读文件使用Chunk Transfer-Encoding

刚刚说了,是否使用Chunk Transfer-Encoding,其实是数据的发送方的职责,因此,客户端从WebHDFS读文件,数据的发送方,WebHDFS Service,是否使用Chunk Transfer-Encoding,是它自己的选择。我们从WebHDFS服务器端代码可以看到,基于Netty的WebHDFS服务器端的确是使用Chunked Transfer Encoding来发送文件到客户端的。
DataNode启动的时候,会构造WebHDFS背后的Netty服务器端。构造时加入了ChunkedWriteHandler:

      this.httpServer = new ServerBootstrap().group(bossGroup, workerGroup).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();p.addLast(new HttpRequestDecoder(),.......p.addLast(new ChunkedWriteHandler(),new URLDispatcher(jettyAddr, conf, confForCreate));}});

从上面的代码可以看到,最后网ChannelPipeline中添加了两个ChannelHandlerChunkedWriteHandlerURLDispatcher,那么,这两个Handler在pipeline中的调用顺序是怎样的呢?

我们来看一下ChatGPT的准确回答:


在 Netty 中,ChannelPipeline 是一个处理事件的流水线。ChannelPipeline 中的每个节点是一个 ChannelHandler,负责处理特定类型的事件。当数据通过 Channel 时,它会在 ChannelPipeline 中的各个处理节点中依次经过,每个节点负责特定的处理。

对于 ChannelPipeline.addLast 添加的 ChannelHandler,它们的执行顺序是按照它们被添加的顺序来决定的。新添加的 ChannelHandler 会被添加到管道的尾部,因此会成为最后执行的处理器。

在读写的时候,事件的传递顺序如下:

  • 写操作: 当有数据要被写入 Channel 时,数据会从 ChannelPipeline 的尾部开始,依次经过每个 ChannelHandler write() 方法,最终传递给底层的通道。

  • 读操作: 当有数据从 Channel 读取时,数据会从 ChannelPipeline 的头部开始,依次经过每个 ChannelHandlerchannelRead() 方法,直到达到 ChannelPipeline 的尾部。


所以,在Netty Service向客户端写入数据的时候,从我们自定义的URLDispatcher中写入(其实内部会调用WebHdfsHandler),然后会被ChunkedWriteHandler进行处理,即chunk by chunk的进行数据的编码,最后发送给客户端。
根据ChunkedWriteHandler的官方文档,如果我们使用ChunkedWriteHandler来进行Chunk Transfer Coding的数据传输,它的输入,即它的前一个Handler必须写入的是ChunkedInput格式,这就是WebHdfsHandler.onOpen()的处理方式,ChunkedStreamChunkedInput的实现类:

  private void onOpen(ChannelHandlerContext ctx) throws IOException {........resp = new DefaultHttpResponse(HTTP_1_1, OK);HttpHeaders headers = resp.headers();.....final DFSClient dfsclient = newDfsClient(nnId, conf);HdfsDataInputStream in = dfsclient.createWrappedInputStream(dfsclient.open(path, bufferSize, true));in.seek(offset);long contentLength = in.getVisibleLength() - offset;.......final InputStream data;........headers.set(CONTENT_LENGTH, contentLength);data = new LimitInputStream(in, contentLength);ctx.write(resp); //先写元数据ctx.writeAndFlush(new ChunkedStream(data) { // 再写content,即读取的文件流。这个文件流被封装为ChunkedStream,进而交给pipeline中的ChunkedWriteHanlder,最后发送给客户端@Overridepublic void close() throws Exception {super.close();dfsclient.close();}}).addListener(ChannelFutureListener.CLOSE);}

Response Header中为什么没有Transfer-Encoding: chunked

但是很奇怪,我们从HttpUrlConnection收到的服务器端的Response Header中并没有看到Transfer-Encoding:chunked,相反,我们看到了Content-Length: 192832,难道文件并不是以Chunked Transfer-Encoding的编码方式发送的?
其实不是这样的。HttpUrlConnectionChunked Transfer-Encoding的数据处理并完全依赖Transfer-Encoding:chunked的header,它可以自动识别数据本身的格式,然后在底层自动识别并解码Chunked Transfer-Encoding,然后直接将解码后的结以InputStream的方式返回给客户端。Netty的ChunkedWriteHandler帮我们发送数据到远程客户端时,尽管数据是按照chunked格式发送给的,但是其并不会帮我们加上Transfer-Encoding:chunked到Response header中去,但是客户端依然是可以正确解析到的。ChunkedWriteHandler的好处是:

  • 提供更灵活的数据处理:ChunkedWriteHandler可以接收 FileRegionHttpContentByteBuf 对象,并负责将它们转换成分块的形式发送到下一个 ChannelOutboundHandler
  • 自动处理分块传输:ChunkedWriteHandler会自动将数据分块,确保数据按照 Chunked Transfer Encoding 规范传输。
  • 对于大文件或大数据流,使用 ChunkedWriteHandler 可以在传输过程中分块发送,降低内存占用。
    我们看到, WebHDFS服务器端使用了ChunkedWriteHandler,因此在ChannelPipeline中,需要将读取块数据的InputStream封装成ChunkedStream,因为ChunkedStreamChunkedInput 接口的实现类:
    final InputStream data;if (contentLength >= 0) {headers.set(CONTENT_LENGTH, contentLength);data = new LimitInputStream(in, contentLength);} else {data = in;}ctx.write(resp);ctx.writeAndFlush(new ChunkedStream(data) { //将读取块数据的InputStream封装为ChunkedStream,供ChunkedWriteHandler进行进一步的分块处理@Overridepublic void close() throws Exception {super.close();dfsclient.close();}}).addListener(ChannelFutureListener.CLOSE);

所以,其实ChunkedWriteHandler只是分块写入数据的一种更好的封装方式,方便了我们进行chunk数据的写入,但其实我们完全可以不用ChunkedWriteHandler而手动进行chunk数据写入,比如,我们自己在header中添加Transfer-Encoding:chunked,然后按照 Chunked Transfer Encoding 去写数据,服务端示例代码如下:

public class NettyChunkedWriteHandlerExample {public static void main(String[] args) {.....try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new HttpServerCodec());pipeline.addLast(new SimpleChannelInboundHandler<HttpMessage>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpMessage msg) throws Exception {FileInputStream fis = new FileInputStream("/Users/cwu/chunk-test");ChunkedStream chunkedStream = new ChunkedStream(fis);// 设置响应头DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK);HttpHeaders headers = response.headers();headers.set(ACCESS_CONTROL_ALLOW_ORIGIN, "*");headers.set(CONTENT_TYPE, APPLICATION_OCTET_STREAM);headers.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); // 设置`Transfer-Encoding:chunked`ctx.write(response);chunk_write_stream(fis, ctx);....serverBootstrap.bind(8087).sync().channel().closeFuture().sync();.....}public static void chunk_write_stream(InputStream inputStream,  ChannelHandlerContext ctx) throws IOException {// 读取文件并以 chunked 形式写入响应byte[] buffer = new byte[1024];int bytesRead;while ((bytesRead = inputStream.read(buffer)) != -1) {if(bytesRead != 1024){System.out.println("The final bytesRead is not 1024, its size is " + bytesRead);byte[] buffer_copy = Arrays.copyOfRange(buffer, 0, bytesRead);ctx.write(new DefaultHttpContent(Unpooled.copiedBuffer(buffer_copy)));}else{ctx.write(new DefaultHttpContent(Unpooled.copiedBuffer(buffer)));}}//写入最后一个空字符ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(ChannelFutureListener.CLOSE);}

这时候我们看基于HttpURLConnection的客户端,可以看到,客户端识别了Transfer-Encoding:chunked,下层Stream的封装已经自动有了ChunkedInputStream,同样完成了基于chunk的数据传输。

测试WebHDFS是否支持chunk Transfer-Encoding时的一个错误导致的错误判断

我在最初测试WebHDFS是否支持 Chunked Transfer-Encoding,错误的代码差点儿导致我做出了错误的结论,认为从WebHDFS读取数据无法使用Chunked Transfer Encoding
由于对Chunked Transfer-Encoding理解不正确,我以为要想让服务器端Chunk by Chunk的方式将文件传输过来,客户端需要在GET请求(由于是读文件,因此是GET请求)中添加Transfer-Encoding:chunked header,对于HttpURLConnection客户端,不用手动添加这个header,只需要调用connection.setChunkedStreamingMode(0);就可以了,我这样去做,但是服务器端抛出了一个400

java.io.IOException: Server returned HTTP response code: 400 for URL: http://rccd101-7a.sjc2.dev.conviva.com:9864/webhdfs/v1/user/hadoop/hello-world/session_summaries.PT_HOURLY_2023-09-07_2023-09-07.sql-2?op=OPEN&user.name=hadoop&namenoderpcaddress=olap-hdfs-test&offset=0at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1914)at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1512)at WebHDFSReadFileExample.chunk_read(WebHDFSReadFileExample.java:45)at WebHDFSReadFileExample.main(WebHDFSReadFileExample.java:86)

代码很简单,就是在创建HttpURLConnection的时候,增加了connection.setChunkedStreamingMode(0);。后来通过debug,发现,当我们调用connection.setChunkedStreamingMode(0);的时候,会带来了两个主要变化:

  1. 自动重定向会被disable:这个变化倒不是导致我们上面的异常的原因,但是需要引起注意。这意味着,即使我们没有显式地通过connection.setInstanceFollowRedirects(false);去关闭自动重定向,如果调用了connection.setChunkedStreamingMode(0);,自动重定向也会被关闭。加入式读文件,那么我们需要手动识别并处理307
  2. GET请求会被自动变成POST请求:这是导致400问题的原因。WebHDFS客户端是将Http Method和用户的operation联合在一起来识别操作的,代码如下。可以看到,如果是OPEN操作,那么Http Method必须是GET 才能被识别,因此,一个POST类型的OPEN操作无法被识别:
      public void handle(ChannelHandlerContext ctx, HttpRequest req)throws IOException, URISyntaxException {String op = params.op();HttpMethod method = req.getMethod();if (PutOpParam.Op.CREATE.name().equalsIgnoreCase(op)&& method == PUT) {onCreate(ctx);} else if (PostOpParam.Op.APPEND.name().equalsIgnoreCase(op)&& method == POST) {onAppend(ctx);} else if (GetOpParam.Op.OPEN.name().equalsIgnoreCase(op)&& method == GET) {onOpen(ctx); //读取文件的HTTP Method必须是Get} else if(GetOpParam.Op.GETFILECHECKSUM.name().equalsIgnoreCase(op)&& method == GET) {onGetFileChecksum(ctx);} else if(PutOpParam.Op.CREATE.name().equalsIgnoreCase(op)&& method == OPTIONS) {allowCORSOnCreate(ctx);} else {throw new IllegalArgumentException("Invalid operation " + op);}
    
  • 那么,为什么我们调用connection.setChunkedStreamingMode(0);,请求类型会被强制改成POST呢?因为connection.setChunkedStreamingMode(0);本来就应该是数据发送方的事情,数据的写入方通过设置Transfer-Encoding:chunked来告知数据接受方自己的数据是Chunked Transfer Coding,请按照Chunked Transfer Coding格式进行解析。所以,我们在读取WebHDFS文件的时候设置connection.setChunkedStreamingMode(0);是错误的操作。

WebHDFS的特性和不足总结

从上面的代码分析可以看到,先抛开性能差异不谈,在具体实现上,基于Hadoop Native的客户端的很多实现细节,肯定是无法在基于Http的WebHDFS中实现的。因此,WebHDFS表面上能成为Hadoop Native的替代,但是其实整个流程都发生了变化。总而言之,我们通过实验和HDFS的代码发现:

  1. WebHDFS不支持HDFS HA。但是这还好,我们可以在WebHDFS客户端上层手动做判断,实现HA。
  2. 写大文件的时候所有Block的第一个Replica会集中在某一台机器上。
  3. 读取文件的时候,文件的所有Block的读取操作都来自于第一个Block的某一个Replica机器
  4. 基于Http本身的限制,hedged read肯定完全不再可能。
  5. 数据的写入和读取都支持Chunked Transfer Coding
  6. 性能问题。我没有进行benchmark测试,有兴趣的读者可以自行寻找测试结果或者自行测试。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/236687.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据结构与算法教程,数据结构C语言版教程!(第三部分、栈(Stack)和队列(Queue)详解)五

第三部分、栈(Stack)和队列(Queue)详解 栈和队列&#xff0c;严格意义上来说&#xff0c;也属于线性表&#xff0c;因为它们也都用于存储逻辑关系为 "一对一" 的数据&#xff0c;但由于它们比较特殊&#xff0c;因此将其单独作为一章&#xff0c;做重点讲解。 使用栈…

[自动驾驶算法][从0开始轨迹预测]:一、坐标和坐标系变换

既然要从0开始轨迹预测&#xff0c;那从哪开始写起呢&#xff1f;回想下自己的学习历程&#xff0c;真正有挑战性的不是模型结构&#xff0c;不是繁琐的训练和调参&#xff0c;而是数据的制作&#xff01;&#xff01;&#xff01; 笔者自认为不是一个数学基础牢固的人&#xf…

如何使用iPad通过Code App+cpolar实现公网地址远程访问vscode

&#x1f308;个人主页: Aileen_0v0 &#x1f525;热门专栏: 华为鸿蒙系统学习|计算机网络|数据结构与算法 ​&#x1f4ab;个人格言:“没有罗马,那就自己创造罗马~” 文章目录 1. 在iPad下载Code APP2.安装cpolar内网穿透2.1 cpolar 安装2.2 创建TCP隧道 3. iPad远程vscode4. …

删除的数据恢复

1回收站恢复 1.1回收站删除 新手删除是通过del键或者鼠标右键删除,这种删除是并不是真正的删除,而是放到了回收站 1.2回收站的数据恢复 回收站的数据,你要恢复那个直接右键还原即可,删除到回收站的数据并不能称得上是删除,回收站的本质也是一个文件夹,只不过是个特殊的文件…

《GreenPlum系列》GreenPlum初级教程-03GreenPlum系统管理

文章目录 第三章 GreenPlum系统管理1.关于GreenPlum数据库发布版本号2.启动和停止GreenPlum数据库2.1 启动数据库2.2 重启数据库2.3 仅重新载入配置文件更改2.4 停止GreenPlum数据库2.5 停止客户端进程 3.GreenPlum数据库状态查询4.访问GreenPlum数据库4.1 数据库会话参数4.2 支…

Camunda Spin

Spin 常用于在脚本中解析json或者xml使用&#xff0c;S(variable) 表示构造成Spin对象&#xff0c;通过prop(“属性名”)获取属性值&#xff0c;通过stringValue()、numberValue()、boolValue() 等对类型转换。 repositoryService.createDeployment().name("消息事件流程&…

Vim一键配置指南,打造高效率C++开发环境

文章目录 前言安装与卸载功能演示gcc/g升级问题 前言 Vim作为当下最受欢迎的文本编译器之一&#xff0c;不仅具有强大的文本编辑功能&#xff0c;还提供了高度的可定制性。用户可以根据自己的喜好自定义配置&#xff0c;并且通过自己编写插件或者使用现有的插件来扩展Vim的功能…

【PostgreSQL创建索引的锁分析和使用注意】

1.1 创建普通B-tree索引的整体流程 如下是梳理的创建普通B-tree索引的大概流程&#xff0c;可供参考。 1.校验新索引的Catalog元数据|语法解析 ---将创建索引的sql解析成IndexStmt结构&#xff5c;校验B-Tree的handler -----校验内核是否支持该类型的索引,在pg_am中查找&q…

AR HUD全面「上新」

AR HUD赛道正在迎来新的时代。 上周&#xff0c;蔚来ET9正式发布亮相&#xff0c;新车定位为D级行政旗舰轿车&#xff0c;其中&#xff0c;在智能座舱交互层面&#xff0c;继理想L系列、长安深蓝S7之后&#xff0c;也首次取消仪表盘&#xff0c;取而代之的是业内首个全焦段AR H…

thinkphp6报错Driver [Think] not supported.

thinkphp6报错Driver [Think] not supported. 问题解决方法测试 问题 直接使用 View::fetch();渲染模板报错 解决方法 这个报错是由于有安装视图驱动造成的 运行如下命令安装即可 composer require topthink/think-view官方文档中是这么写的 视图功能由\think\View类配合视…

【hyperledger-fabric】部署Java应用远程访问智能合约

简介 首先是根据b站的视频 hyperledger-fabric【3】在 java 应用中访问合约 以及hyperledger-fabric【5】Java应用和私有数据&#xff0c;本文章主要讲述的是视频中我遇到的问题&#xff0c;以及相关知识点的总结。 遇到的问题 问题1&#xff1a;git clone下载下来的代码发现…

Java接入Apache Spark(入门环境搭建、常见问题)

Java接入Apache Spark&#xff08;环境搭建、常见问题&#xff09; 背景介绍 Apache Spark 是一个快速的&#xff0c;通用的集群计算系统。它对 Java&#xff0c;Scala&#xff0c;Python 和 R 提供了的高层 API&#xff0c;并有一个经优化的支持通用执行图计算的引擎。它还支…

归并排序-排序算法

前言 如果一个数组的左右区间都有序&#xff0c;我们可以使用一种方法&#xff08;归并&#xff09;&#xff0c;使这个数组变得有序。 如下图&#xff1a; 过程也很简单&#xff0c;分别取左右区间中的最小元素&#xff0c;再把其中较小的元素放到临时数组中&#xff0c;例如…

ElasticSearch 学习9 spring-boot ,elasticsearch7.16.1实现中文拼音分词搜索

一、elasticsearch官网下载&#xff1a;Elasticsearch 7.16.1 | Elastic 二、拼音、ik、繁简体转换插件安装 ik分词&#xff1a;GitHub - medcl/elasticsearch-analysis-ik: The IK Analysis plugin integrates Lucene IK analyzer into elasticsearch, support customized d…

[开发语言][c++][python]:C++与Python中的赋值、浅拷贝与深拷贝

C与Python中的赋值、浅拷贝与深拷贝 1. Python中的赋值、浅拷贝、深拷贝2. C中的赋值、浅拷贝、深拷贝2.1 概念2.2 示例&#xff1a;从例子中理解1) 不可变对象的赋值、深拷贝、浅拷贝2) 可变对象的赋值、浅拷贝与深拷贝3) **可变对象深浅拷贝(外层、内层改变元素)** 写在前面&…

Salesforce财务状况分析

纵观Salesforce发展史和十几年财报中的信息&#xff0c;Salesforce从中小企业CRM服务的蓝海市场切入&#xff0c;但受限于中小企业的生命周期价值和每用户平均收入小且获客成本和流失率不对等&#xff0c;蓝海同时也是死海。 Salesforce通过收购逐渐补足品牌和产品两块短板&am…

系统架构设计师教程(十)软件可靠性基础知识

软件可靠性基础知识 10.1 软件架构演化和定义的关系10.1.1 演化的重要性10.1.2 演化和定义的关系 10.2 面向对象软件架构演化过程10.2.1 对象演化10.2.2 消息演化10.2.3 复合片段演化10.2.4 约束演化 10.3 软件架构演化方式的分类10.3.1 软件架构演化时期10.3.2 软件架构静态演…

mp4文件全部转换为mp3

问题 今天突发奇想&#xff0c;想把mp4视频转换为mp3来收听&#xff0c;于是想到了ffmpeg工具 步骤 安装ffmpeg环境 要在 Windows 上配置 FFmpeg 环境&#xff0c;你可以按照以下步骤进行操作&#xff1a; 下载 FFmpeg&#xff1a; 首先&#xff0c;你需要下载 FFmpeg 的 W…

C#进阶-IIS服务器发布ASP.NET项目

对于云服务器&#xff0c;程序员一般不会陌生&#xff0c;如果项目需要发布到现网&#xff0c;那么服务器是必不可缺的一项硬性条件&#xff0c;那么如何在云服务器上部署一个项目&#xff0c;需要做哪些配置准备&#xff0c;下面就由本文档为大家讲解&#xff0c;本篇以 IIS服…

暴打小苹果

欢迎来到程序小院 暴打小苹果 玩法&#xff1a;鼠标左键点击任意区域可发招暴打&#xff0c;在苹果到达圆圈时点击更容易击中&#xff0c; 30秒挑战暴打小苹果&#xff0c;打中一次20分&#xff0c;快去暴打小苹果吧^^。开始游戏https://www.ormcc.com/play/gameStart/247 htm…