Flume-transaction机制源码分析

一、整体流程

在这里插入图片描述
FileChannel主要是由WAL预写日志和内存队列FlumeEventQueue组成。
在这里插入图片描述

二、Transaction

public interface Transaction {// 描述transaction状态enum TransactionState { Started, Committed, RolledBack, Closed }void begin();void commit();void rollback();void close();
}

在这里插入图片描述

三、Put()

Put事务发生在source往channel写入数据

// source收集到events后,交由channel处理
getChannelProcessor().processEventBatch(events);
  public void processEventBatch(List<Event> events) {Preconditions.checkNotNull(events, "Event list must not be null");events = interceptorChain.intercept(events);//必需通道Map<Channel, List<Event>> reqChannelQueue =new LinkedHashMap<Channel, List<Event>>();//可选通道,可选通道是指在定义 SinkGroup 时可以选择添加的通道,用于将数据从 SinkGroup 中的一个 Sink 传输到另一个 Sink//没有配置可选通道可忽略。Map<Channel, List<Event>> optChannelQueue =new LinkedHashMap<Channel, List<Event>>();for (Event event : events) {List<Channel> reqChannels = selector.getRequiredChannels(event);for (Channel ch : reqChannels) {List<Event> eventQueue = reqChannelQueue.get(ch);if (eventQueue == null) {eventQueue = new ArrayList<Event>();reqChannelQueue.put(ch, eventQueue);}eventQueue.add(event);}List<Channel> optChannels = selector.getOptionalChannels(event);for (Channel ch : optChannels) {List<Event> eventQueue = optChannelQueue.get(ch);if (eventQueue == null) {eventQueue = new ArrayList<Event>();optChannelQueue.put(ch, eventQueue);}eventQueue.add(event);}}// Process required channelsfor (Channel reqChannel : reqChannelQueue.keySet()) {Transaction tx = reqChannel.getTransaction();Preconditions.checkNotNull(tx, "Transaction object must not be null");try {tx.begin();List<Event> batch = reqChannelQueue.get(reqChannel);for (Event event : batch) {//进入必选通道,调用put()reqChannel.put(event);}tx.commit();} catch (Throwable t) {tx.rollback();if (t instanceof Error) {LOG.error("Error while writing to required channel: " + reqChannel, t);throw (Error) t;} else if (t instanceof ChannelException) {throw (ChannelException) t;} else {throw new ChannelException("Unable to put batch on required " +"channel: " + reqChannel, t);}} finally {if (tx != null) {tx.close();}}}// Process optional channelsfor (Channel optChannel : optChannelQueue.keySet()) {Transaction tx = optChannel.getTransaction();Preconditions.checkNotNull(tx, "Transaction object must not be null");try {tx.begin();List<Event> batch = optChannelQueue.get(optChannel);for (Event event : batch) {optChannel.put(event);}tx.commit();} catch (Throwable t) {tx.rollback();LOG.error("Unable to put batch on optional channel: " + optChannel, t);if (t instanceof Error) {throw (Error) t;}} finally {if (tx != null) {tx.close();}}}}
  //org.apache.flume.channel.BasicChannelSemantics.javapublic void put(Event event) throws ChannelException {BasicTransactionSemantics transaction = currentTransaction.get();Preconditions.checkState(transaction != null,"No transaction exists for this thread");//调用transaction对象put()transaction.put(event);}
  //org.apache.flume.channel.BasicTransactionSemantics.javaprotected void put(Event event) {Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,"put() called from different thread than getTransaction()!");Preconditions.checkState(state.equals(State.OPEN),"put() called when transaction is %s!", state);Preconditions.checkArgument(event != null,"put() called with null event!");try {doPut(event);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new ChannelException(e.toString(), e);}}
// org.apache.flume.channel.file.FileBackedTransaction.java
protected void doPut(Event event) throws InterruptedException {channelCounter.incrementEventPutAttemptCount();if (putList.remainingCapacity() == 0) {throw new ChannelException("Put queue for FileBackedTransaction " +"of capacity " + putList.size() + " full, consider " +"committing more frequently, increasing capacity or " +"increasing thread count. " + channelNameDescriptor);}// this does not need to be in the critical section as it does not// modify the structure of the log or queue.if (!queueRemaining.tryAcquire(keepAlive, TimeUnit.SECONDS)) {throw new ChannelFullException("The channel has reached it's capacity. "+ "This might be the result of a sink on the channel having too "+ "low of batch size, a downstream system running slower than "+ "normal, or that the channel capacity is just too low. "+ channelNameDescriptor);}boolean success = false;//doTake,也会获取该锁,所以doTake和doPut只能操作一个,无法同时操作log.lockShared();try {// transactionID在前面创建transaction对象的时候定义的// 将put事件写入WAL日志,并将数据持久化到磁盘文件FlumeEventPointer ptr = log.put(transactionID, event);// 将event pointer放到内存队列putListPreconditions.checkState(putList.offer(ptr), "putList offer failed "+ channelNameDescriptor);// 将event也放到inflightPuts中,这是临时数据,因为还没有提交// 如果还没有提交就直接放到FlumeEventQueue,那么将提前暴露给sink。queue.addWithoutCommit(ptr, transactionID);success = true;} catch (IOException e) {channelCounter.incrementEventPutErrorCount();throw new ChannelException("Put failed due to IO error "+ channelNameDescriptor, e);} finally {log.unlockShared();if (!success) {// release slot obtained in the case// the put fails for any reasonqueueRemaining.release();}}}
  // org.apache.flume.channel.file.FlumeEventQueue.javasynchronized void addWithoutCommit(FlumeEventPointer e, long transactionID) {inflightPuts.addEvent(transactionID, e.toLong());}public void addEvent(Long transactionID, Long pointer) {// event放到inflightEventsinflightEvents.put(transactionID, pointer);inflightFileIDs.put(transactionID,FlumeEventPointer.fromLong(pointer).getFileID());syncRequired = true;}

InflightPuts 是实际正在传输中的事件集合,Flume 使用 InflightPuts 来跟踪正在传输的事件,只有事务提交了,InflightPuts 里才会清空这次事务的所有pointer数据。InflightPuts 和Flume的checkpoint机制密切相关。

先来看下Flume如何将event持久化到磁盘,将写到"log-"前缀的文件中。log.put(transactionID, event):

// org.apache.flume.channel.file.Log.java
FlumeEventPointer put(long transactionID, Event event)throws IOException {Preconditions.checkState(open, "Log is closed");FlumeEvent flumeEvent = new FlumeEvent(event.getHeaders(), event.getBody());//封装Put操作,WAL日志会记录四种操作,分别是Put,Take,Commit和RollbackPut put = new Put(transactionID, WriteOrderOracle.next(), flumeEvent);ByteBuffer buffer = TransactionEventRecord.toByteBuffer(put);//选择数据目录的数据文件,比如log-1int logFileIndex = nextLogWriter(transactionID);long usableSpace = logFiles.get(logFileIndex).getUsableSpace();long requiredSpace = minimumRequiredSpace + buffer.limit();if (usableSpace <= requiredSpace) {throw new IOException("Usable space exhausted, only " + usableSpace +" bytes remaining, required " + requiredSpace + " bytes");}boolean error = true;try {try {// Put事件写入WAL日志文件,Event也就持久化到文件了// logFileIndex就是数据文件ID,比如log-1文件FlumeEventPointer ptr = logFiles.get(logFileIndex).put(buffer);error = false;return ptr;} catch (LogFileRetryableIOException e) {if (!open) {throw e;}roll(logFileIndex, buffer);FlumeEventPointer ptr = logFiles.get(logFileIndex).put(buffer);error = false;return ptr;}} finally {if (error && open) {roll(logFileIndex);}}}

再来看下commit,tx.commit():

  // org.apache.flume.channel.BasicTransactionSemantics.javapublic void commit() {Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,"commit() called from different thread than getTransaction()!");Preconditions.checkState(state.equals(State.OPEN),"commit() called when transaction is %s!", state);try {doCommit();} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new ChannelException(e.toString(), e);}// 修改transaction状态state = State.COMPLETED;}
// org.apache.flume.channel.file.FileChannel.FileBackedTransaction
protected void doCommit() throws InterruptedException {int puts = putList.size();int takes = takeList.size();if (puts > 0) {Preconditions.checkState(takes == 0, "nonzero puts and takes "+ channelNameDescriptor);log.lockShared();try {// commit操作写入WAL日志log.commitPut(transactionID);channelCounter.addToEventPutSuccessCount(puts);synchronized (queue) {while (!putList.isEmpty()) {// 将putList的event pointer放到transaction的queue中// channel的容量就是channel transaction的queue的容量if (!queue.addTail(putList.removeFirst())) {StringBuilder msg = new StringBuilder();msg.append("Queue add failed, this shouldn't be able to ");msg.append("happen. A portion of the transaction has been ");msg.append("added to the queue but the remaining portion ");msg.append("cannot be added. Those messages will be consumed ");msg.append("despite this transaction failing. Please report.");msg.append(channelNameDescriptor);LOG.error(msg.toString());Preconditions.checkState(false, msg.toString());}}//清除事务ID//清空inflightPutsqueue.completeTransaction(transactionID);}} catch (IOException e) {throw new ChannelException("Commit failed due to IO error "+ channelNameDescriptor, e);} finally {log.unlockShared();}} else if (takes > 0) {//省略代码//......}//清空putList.clear();takeList.clear();channelCounter.setChannelSize(queue.getSize());}
  // org.apache.flume.channel.file.FlumeEventQueuesynchronized void completeTransaction(long transactionID) {//清空inflightPutsif (!inflightPuts.completeTransaction(transactionID)) {inflightTakes.completeTransaction(transactionID);}}
    // org.apache.flume.channel.file.FlumeEventQueue.InflightEventWrapper// inflightPuts和inflightTakes都是InflightEventWrapper的实例对象public boolean completeTransaction(Long transactionID) {if (!inflightEvents.containsKey(transactionID)) {return false;}//清除inflightPuts的事务ID//清空inflightPuts的inflightEventsinflightEvents.removeAll(transactionID);inflightFileIDs.removeAll(transactionID);syncRequired = true;return true;}

内存队列putList中的event pointer放入到transaction的queue中,并清空了inflightPuts,说明当前put事务已经提交成功了

四、Take()

Take事务发生在sink从channel拿取数据。

public Status process() throws EventDeliveryException {Status result = Status.READY;Channel channel = getChannel();Transaction transaction = channel.getTransaction();Event event = null;try {transaction.begin();//从channel拿取数据event = channel.take();......transaction.commit();} catch (Exception ex) {transaction.rollback();throw new EventDeliveryException("Failed to process transaction" , ex);} finally {transaction.close();}return result;}
 // org.apache.flume.channel.BasicTransactionSemantics.javaprotected Event take() {Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,"take() called from different thread than getTransaction()!");Preconditions.checkState(state.equals(State.OPEN),"take() called when transaction is %s!", state);try {return doTake();} catch (InterruptedException e) {Thread.currentThread().interrupt();return null;}}
// org.apache.flume.channel.file.FileChannel.FileBackedTransaction
protected Event doTake() throws InterruptedException {channelCounter.incrementEventTakeAttemptCount();if (takeList.remainingCapacity() == 0) {throw new ChannelException("Take list for FileBackedTransaction, capacity " +takeList.size() + " full, consider committing more frequently, " +"increasing capacity, or increasing thread count. "+ channelNameDescriptor);}log.lockShared();/** 1. Take an event which is in the queue.* 2. If getting that event does not throw NoopRecordException,*    then return it.* 3. Else try to retrieve the next event from the queue* 4. Repeat 2 and 3 until queue is empty or an event is returned.*/try {while (true) {// 从FlumeEventQueue中取出event pointer// 并将event pointer放到inflightTakesFlumeEventPointer ptr = queue.removeHead(transactionID);if (ptr == null) {return null;} else {try {// first add to takeList so that if write to disk// fails rollback actually does it's work//首先将pointer放入takeList中Preconditions.checkState(takeList.offer(ptr),"takeList offer failed "+ channelNameDescriptor);// take操作写入WAL日志log.take(transactionID, ptr); // write take to disk//根据pointer从持久化文件中获取eventEvent event = log.get(ptr);return event;} catch (IOException e) {channelCounter.incrementEventTakeErrorCount();throw new ChannelException("Take failed due to IO error "+ channelNameDescriptor, e);} catch (NoopRecordException e) {LOG.warn("Corrupt record replaced by File Channel Integrity " +"tool found. Will retrieve next event", e);takeList.remove(ptr);} catch (CorruptEventException ex) {channelCounter.incrementEventTakeErrorCount();if (fsyncPerTransaction) {throw new ChannelException(ex);}LOG.warn("Corrupt record found. Event will be " +"skipped, and next event will be read.", ex);takeList.remove(ptr);}}}} finally {log.unlockShared();}}

看一下queue.removeHead(transactionID)

synchronized FlumeEventPointer removeHead(long transactionID) {if (backingStore.getSize() == 0) {return null;}long value = remove(0, transactionID);Preconditions.checkState(value != EMPTY, "Empty value "+ channelNameDescriptor);FlumeEventPointer ptr = FlumeEventPointer.fromLong(value);backingStore.decrementFileID(ptr.getFileID());return ptr;}
protected synchronized long remove(int index, long transactionID) {if (index < 0 || index > backingStore.getSize() - 1) {throw new IndexOutOfBoundsException("index = " + index+ ", queueSize " + backingStore.getSize() + " " + channelNameDescriptor);}copyCount++;long start = System.currentTimeMillis();long value = get(index);if (queueSet != null) {queueSet.remove(value);}//if txn id = 0, we are recovering from a crash.if (transactionID != 0) {//将queue的event pointer加入到inflightTakes中inflightTakes.addEvent(transactionID, value);}if (index > backingStore.getSize() / 2) {// Move tail part to leftfor (int i = index; i < backingStore.getSize() - 1; i++) {long rightValue = get(i + 1);set(i, rightValue);}set(backingStore.getSize() - 1, EMPTY);} else {// Move head part to rightfor (int i = index - 1; i >= 0; i--) {long leftValue = get(i);set(i + 1, leftValue);}set(0, EMPTY);backingStore.setHead(backingStore.getHead() + 1);if (backingStore.getHead() == backingStore.getCapacity()) {backingStore.setHead(0);}}backingStore.setSize(backingStore.getSize() - 1);copyTime += System.currentTimeMillis() - start;return value;}

拿到event后,再来看commit事务:

protected void doCommit() throws InterruptedException {int puts = putList.size();int takes = takeList.size();if (puts > 0) {//省略代码//......} else if (takes > 0) {log.lockShared();try {log.commitTake(transactionID);//清除事务ID//清空inflightTakesqueue.completeTransaction(transactionID);channelCounter.addToEventTakeSuccessCount(takes);} catch (IOException e) {throw new ChannelException("Commit failed due to IO error "+ channelNameDescriptor, e);} finally {log.unlockShared();}queueRemaining.release(takes);}putList.clear();takeList.clear();channelCounter.setChannelSize(queue.getSize());}
  // org.apache.flume.channel.file.FlumeEventQueuesynchronized void completeTransaction(long transactionID) {if (!inflightPuts.completeTransaction(transactionID)) {//清空inflightTakesinflightTakes.completeTransaction(transactionID);}}

如果take事务提交,则从transaction的queue中取出了event pointer,并清空inflightTakes和takeList

transaction.rollback()

protected void doRollback() throws InterruptedException {int puts = putList.size();int takes = takeList.size();log.lockShared();try {if (takes > 0) {Preconditions.checkState(puts == 0, "nonzero puts and takes "+ channelNameDescriptor);synchronized (queue) {while (!takeList.isEmpty()) {// 把takeList中的数据放回到transaction的queue中Preconditions.checkState(queue.addHead(takeList.removeLast()),"Queue add failed, this shouldn't be able to happen "+ channelNameDescriptor);}}}putList.clear();takeList.clear();//清除事务ID//清空inflightTakesqueue.completeTransaction(transactionID);channelCounter.setChannelSize(queue.getSize());//rollback操作写入WAL日志log.rollback(transactionID);} catch (IOException e) {throw new ChannelException("Commit failed due to IO error "+ channelNameDescriptor, e);} finally {log.unlockShared();// since rollback is being called, puts will never make it on// to the queue and we need to be sure to release the resourcesqueueRemaining.release(puts);}}

take事务回滚时,takeList中的数据重新放回到transaction的queue中,并清空这次事务的inflightTakes

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/280710.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Introduction to Data Mining 数据挖掘

Why Data Mining? • The Explosive Growth of Data: from terabytes to petabytes — Data collection and data availability ◦ Automated data collection tools, database systems, Web, computerized society — Major sources of abundant data ◦ Business: Web, e-co…

哔哩哔哩秋招Java二面

前言 作者&#xff1a;晓宜 个人简介&#xff1a;互联网大厂Java准入职&#xff0c;阿里云专家博主&#xff0c;csdn后端优质创作者&#xff0c;算法爱好者 一面过后面试官叫我别走&#xff0c;然后就直接二面&#xff0c;二面比较简短&#xff0c;记录一下&#xff0c;希望可以…

JWT 认证机制

1. 了解 Session 认证的局限性 Session 认证机制需要配合 Cookie 才能实现。由于 Cookie 默认不支持跨域访问&#xff0c;所以当涉及到前端跨域请求后端接口的时候&#xff0c;需要做很多额外的配置&#xff0c;才能实现跨域 Session 认证 注意&#xff1a; 1. 当前端请求后端接…

AMPQ和rabbitMQ

RabbitMQ 的 Channel、Connection、Queue 和 Exchange 都是按照 AMQP&#xff08;Advanced Message Queuing Protocol&#xff09;标准实现的。 AMPQ的网络部分 AMQP没有使用HTTP&#xff0c;使用TCP自己实现了应用层协议。 AMQP实现了自己特有的网络帧格式。 一个Connection…

用 Visual Studio 调试器中查看内存中图像

返回目录&#xff1a;OpenCV系列文章目录&#xff08;持续更新中......&#xff09; 前一篇&#xff1a;OpenCV4.9.0在windows系统下的安装 后一篇&#xff1a;OpenCV-Java 开发简介 ​警告 本教程可以包含过时的信息。 Image Watch 是 Microsoft Visual Studio 的插件&a…

气膜馆建造成本高吗?专业气膜厂家告诉你!

气膜馆作为一种现代化建筑结构&#xff0c;在体育、娱乐和展览领域备受青睐。然而&#xff0c;对于许多人来说&#xff0c;最关心的问题之一就是建造气膜馆的成本。下面轻空间气膜厂家将带您深入探讨气膜馆的建设成本&#xff0c;特别关注每平米的建设成本是多少&#xff0c;以…

matlab 将矩阵写入文件

目录 一、概述1、算法概述2、主要函数二、将矩阵写入到文本文件三、将矩阵写入电子表格文件四、将矩阵写入指定的工作表和范围五、将数据追加到电子表格六、将矩阵数据追加到文本文件七、参考链接本文由CSDN点云侠原创,原文链接。如果你不是在点云侠的博客中看到该文章,那么此…

Android 系统源码快速入门

Android源码快速入门 今天分享的内容是Android源码快速入门&#xff0c;主要分为以下几个步骤&#xff1a; * 硬件要求 * 虚拟机安装 * 开发环境搭建 * 下载编译源码 * 从一个简单的实际开发需求体验 Framework 开发硬件要求 用于 Android Framework 开发的电脑需要较强的 C…

HBase在表操作--显示中文

启动HBase后&#xff0c;Master和RegionServer两个服务器&#xff0c;分别对应进程为HMaster和HRegionServe。&#xff08;可通过jps查看&#xff09; 1.进入表操作 hbase shell 2.查看当前库中存在的表 list 3.查看表中数据&#xff08;注&#xff1a;学习期间可用&#…

传统企业在推行TPM时需要注意哪些问题?

当下&#xff0c;传统企业面临着转型升级的巨大压力。为了提升生产效率和产品质量&#xff0c;许多企业选择推行全面生产维护&#xff08;Total Productive Maintenance&#xff0c;简称TPM&#xff09;管理模式。然而&#xff0c;在实际推行过程中&#xff0c;传统企业需要注意…

MAC IntelliJ IDEA搭建Doris Fe

目录 版本信息 安装环境依赖 拉取源码 下载 Doris 编译依赖 修改系统最大文件句柄数 编译 Doris 配置 Debug 环境 生成 FE 代码 FE模块代码导入 配置 Debug FE 启动 FE 报错问题 版本信息 本次安装的doris版本信息为doris-2.1.0-rc11 IntelliJ IDEA 配置jdk17、m…

CCF202309-2——坐标变换(其二)80分代码及思路

思路&#xff0c;还是暴力求解&#xff0c;直接对每一个操作进行遍历&#xff0c;最后结果保留三位小数&#xff0c;但是不知道为什么直接printf("%.3f",x)进行输出没有分&#xff0c;结果完全对得上&#xff0c;以下是80分提交代码&#xff0c;可能大的测试点没有通…

龙芯新世界系统(安同AOCS OS)安装Cinnamon桌面最新版6.0.4

龙芯的新世界系统安同AOCS OS是十分优秀的操作系统&#xff0c;处于纯社区方式运行&#xff0c;她的各组件更新得很及时&#xff0c;很多组件都处于最新的状态&#xff0c;给我们安装使用最新的开源软件提供了很好的基础。由于本人一直使用Cinnamon桌面环境&#xff0c;各方面都…

设计模式|工厂模式

文章目录 1. 工厂模式的三种实现2. 简单工厂模式和工厂方法模式示例3. 抽象工厂模式示例4. 工厂模式与多态的关系5. 工程模式与策略模式的关系6. 面试中可能遇到的问题6.1 **工厂模式的概念是什么&#xff1f;**6.2 **工厂模式解决了什么问题&#xff1f;**6.3 **工厂模式的优点…

Flume入门概述及安装部署

目录 一、Flume概述1.1 Flume定义1.2 Flume基础架构 二、Flume安装部署 一、Flume概述 1.1 Flume定义 Flume是Cloudera提供的一个高可用的&#xff0c;高可靠的&#xff0c;分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构&#xff0c;灵活简单。 1.2 Flume基础…

基于深度学习的面部情绪识别算法仿真与分析

声明&#xff1a;以下内容均属于本人本科论文内容&#xff0c;禁止盗用&#xff0c;否则将追究相关责任 基于深度学习的面部情绪识别算法仿真与分析 摘要结果分析1、本次设计通过网络爬虫技术获取了七种面部情绪图片&#xff1a;吃惊、恐惧、厌恶、高兴、伤心、愤怒、自然各若…

OpenGL+QT实现矢量和影像的叠加绘制

一、QT下OpenGL框架的初始化 OpenGL的介绍我在这里就没有必要介绍了&#xff0c;那OpenGL和QT的结合在这里就有必要先介绍一下&#xff0c;也就是怎么使用QT下的OpenGL框架。要想使用QT下的OpenGL框架&#xff0c;就必须要子类化QGLWidget&#xff0c;然后实现。 void initia…

流畅的 Python 第二版(GPT 重译)(十一)

第二十章&#xff1a;并发执行器 抨击线程的人通常是系统程序员&#xff0c;他们心中有着典型应用程序员终其一生都不会遇到的用例。[…] 在 99%的用例中&#xff0c;应用程序员可能会遇到的情况是&#xff0c;生成一堆独立线程并将结果收集到队列中的简单模式就是他们需要了解…

sqllab第三十四关通关笔记

知识点&#xff1a; 宽字节注入单引号闭合注意&#xff1a;不能直接在输入框进行宽字节注入&#xff0c;会被url编码&#xff0c;除非输入原始字符&#xff08;%df已经是url编码了&#xff0c;直接输入会二次编码&#xff09;错误注入 payload:username1%dforextractvalue(1,c…

【G3D笔记】AI生成式3D算法相关环境安装爬坑笔记

【G3D笔记】AI生成式3D算法相关环境安装爬坑笔记) 1、 RayMarching1.1 error C1189: #error: You need C++17 to compile PyTorch1.2 raymarching安装环境版本测试1.3 host_config.h(231): fatal error C1083: 无法打开包括文件: “crtdefs.h”2、Tiny-Cuda-nn2.1 HTTP/2 stre…