Node.js 高级编程之 Stream(我是跟 ChatGPT 学会的)

前言

在做 SSR Stream Render 的时候遇到了 Node.js 的 Stream,但是对其总是一知半解。正好最近 ChatGPT 很火,找他学一学吧,没想到真的把我教会了。PS:文末有跟 ChatGPT 的精彩对话(请忽略我稀烂的英语)。

为什么需要 Stream

首先我们通过一个简单的例子来说明一下,使用流的好处。如下所示,我们将一个大文件读取到另一个文件中:

const fs = require('fs')fs.readFile('./big.file', (err, data) => {if (err) throw errfs.writeFile('./out', data, () => {})
}) 

通过活动监视器,我们发现该进程内存占用为 300 MB 左右:

如果,我们换成流,情况就不一样了:

const fs = require('fs')const readStream = fs.createReadStream('./big.file')
const writeStream = fs.createWriteStream('./out')
readStream.pipe(writeStream) 

看来 Stream 在处理大数据的时候是非常好的工具,接下来就让我们通过打比方的方式来进行理解吧。

通过比喻来理解 Stream

Readable Stream

首先,对于 Readable Stream,我们可以把他比喻成一个水龙头:

水龙头的水来自于哪,需要具体的 Readable Stream 来实现。比如 fs.createReadStream 创建的 Readable Stream 其水源自于文件,process.stdin 水源自于标准输入。

两个状态 flowing 和 paused

水龙头有两个状态 flowingpaused,即龙头打开或关闭。初始化一个 Readable Stream 时,默认是关闭的:

const readStream = fs.createReadStream('./file')
console.log(readStream._readableState.flowing, readStream._readableState.paused) // false true 

当我们监听 data 事件时,会自动打开开关:

const readStream = fs.createReadStream('./file')
readStream.on('data', (chunk) => {console.log(chunk)
})
console.log(readStream._readableState.flowing, readStream._readableState.paused) // true false 

且会通知水源往龙头中灌水,这样,水就流到了 data 事件的回调函数中:

我们也可以通过 resume 方法来手动开启水龙头,不过要小心,有可能导致水丢失:

const readStream = fs.createReadStream('./file')
readStream.resume()
setTimeout(() => {readStream.on('data', console.log) // 打印为空
}, 1000) 

这就好比先把水龙头打开了,然后再放桶子,肯定会漏掉一些水。

当然,我们也可以调用 pause 关闭水龙头,比如下面这个例子在接收到第一批水后就关闭了水龙头:

const readStream = fs.createReadStream('./big.file')readStream.once('data', (chunk) => {readStream.pause()
}) 

buffer

上面代码调用 pause 后水源的水不会停止,会流到水龙头的一个 buffer 中,直到达到 highWaterMark (最高水位线)则停止:

我们可以通过代码验证一下:

const readStream = fs.createReadStream('./big.file')readStream.once('data', (chunk) => {readStream.pause()setTimeout(() => {console.log(readStream._readableState.length, // 水龙头 buffer 的大小readStream._readableState.highWaterMark // 最高水位线) // 65536 65536}, 1000)
}) 

而且,我们可以重新再次打开水龙头,此时会先消耗掉 buffer 中的水,然后再从源头读取,比如下面这个例子(文末 ChatGPT 给的例子也可以):

const readStream = fs.createReadStream('./big.file')readStream.once('data', (chunk) => {readStream.pause()setTimeout(() => {const data = readStream._readableState.buffer.head.data.toString()readStream.once('data', (chunk) => {console.log(data === chunk.toString()) // 第二次读到的数据确实是来自上次 pause 后存放到 buffer 中的})readStream.resume()}, 2000)
}) 

使用 read 来手动取水

有没有发现,上面这些例子都是水龙头来多少水(即代码中的 chunk)我们就接多少水,有没有可能我们自己控制接水的多少呢?答案是肯定的,我们可以调用 read 这个方法,比如下面这个例子:

const readStream = fs.createReadStream('./big.file')
console.log(readStream(100)) 

不过,上面的这个代码是读不到数据的。原因在于,read 方法是从 buffer 中读取数据,而此时 buffer 里面还是空的。我们需要这样:

const readStream = fs.createReadStream('./big.file')
readStream.on('readable', () => {let chunkconsole.log('Stream is readable (new data received in buffer)')while (null !== (chunk = readStream.read(100))) {console.log(`Read ${chunk.length} bytes of data from buffer`)}
}) 

调用 on('readable'... 会触发水源往 buffer 中灌水,当 buffer 中灌满水后,会调用 readable 的回调函数,此时可以通过 read 方法来消费 buffer 中的水。这里有个问题,当我们 read 的数据超过了 buffer 中的怎么办?我们来实验一下:

const readStream = fs.createReadStream('./big.file')
readStream.on('readable', () => {let chunkconsole.log('Stream is readable (new data received in buffer)')console.log(readStream._readableState.highWaterMark,readStream._readableState.length)// Use a loop to make sure we read all currently available datawhile (null !== (chunk = readStream.read(65537))) {console.log(`Read ${chunk.length} bytes of data...`)}
}) 

运行后,控制台打印如下:

Stream is readable (new data received in buffer)
65536 65536
Stream is readable (new data received in buffer)
131072 196608
Read 65537 bytes of data...
Read 65537 bytes of data...
Stream is readable (new data received in buffer)
131072 196606
Read 65537 bytes of data...
Read 65537 bytes of data...
Stream is readable (new data received in buffer)
131072 196604
... 

分析这个日志,我们发现第一次 readable 事件并没有进入 while 循环,且第一次之后 highWaterMark 的值增加了。经过一番源码调试后,我得到了结论,图示如下:

第一次触发 readable 事件,此时 buffer 中的数据为 65536,而我们需要读取 65537 的数据,数据不够 read 返回 null。并且发现 read 读取的数据大于 highWaterMark,所以更新该参数为原来的两倍,即 131072,然后以该值从水源中再读入一段数据到一个新的节点中 (buffer 是一个链表)。

然后,触发第二次 readable 事件,此时 buffer 数据总长度为 65536 + 131072 = 196608,我们可以读入两次 65537 的数据。此时 buffer 数据总长度变为 196608 - 2 * 65537 = 65534,数据又不够了,read 返回 null,且由于 read 读取的数据小于 highWaterMark,不需要更新,仍然以原来的值从水源中再读入一段数据到一个新的节点中。

然后,触发第三次 readable

除了使用这些已有的 Readable Stream,Node.js 还支持我们自定义。

自定义 Readable Stream

自定义 Readable Stream 有以下两种方式:

// 1
const {Readable} = require('stream')const readableStream = new Readable({read() {this.push('Data 1\n')this.push('Data 2\n')this.push(null)},
})// 2
const {Readable} = require('stream')class CustomReadable extends Readable {constructor(options) {super(options)this.data = ['Data 1\n', 'Data 2\n']}_read() {const chunk = this.data.shift()if (!chunk) {this.push(null)} else {this.push(chunk)}}
}
const readableStream = new CustomReadable() 

其中,调用 push 就是上面说的“水源往水龙头中注水”的动作,该方法中传入 null 表示水源中的水已注完。

下面,我们写一个可以不断产生大写字母的 Readable Stream

const {Readable} = require('stream')class CustomReadable extends Readable {constructor(options) {super(options)this.charCode = 65}_read(n) {for (let i = 0; i < n; i++) {this.push(String.fromCharCode(this.charCode++))if (this.charCode === 91) this.charCode = 65}}
}const readStream = new CustomReadable({highWaterMark: 1})
readStream.on('data', (chunk) => {console.log(chunk.toString())
}) 

注意,我们初始化 readStream 的时候传入了 highWaterMark,这样每次调用 _read 的时候参数就是 1 了。

到此,Readalbe Stream 的核心基本上就介绍完了,接下来介绍 Writable Stream

Writable Stream

我们把 Writable Stream 比作一个有入口和出口的池子:

池子的水最终流向哪,需要具体的 Writable Stream 来实现。比如 fs.createWriteStream 创建的 Writable Stream 其水流向文件,process.stdout 水流向标准输出。

两种工作模式

水池也有两种工作模式,一种是入口来的水直接流向出口(此时,相当于在入口和出口间接了一根水管),一种是入口的水先流到池子中(源码中是存在 buffered 这个属性中),出口慢慢进行消费:

我们初始化一个 Writable Stream 时,然后写一些数据试试:

const writeStream = fs.createWriteStream('./file')
writeStream.write('a') 

此时,采用的是第二种模式。如何切换成第一种模式呢?可以这样:

const writeStream = fs.createWriteStream('./file')
writeStream.on('open', () => {writeStream.write('a')
}) 

通过对比,我想你应该恍然大悟了。第一段代码 writeStream 初始化后,可能出口那边还没有准备好,此时往池子中灌水显然只能先放到池子里。第二段代码是在 writeStreamopen 事件触发后再往水池中灌水,此时出口已就绪,可以直接流出了。

cork 和 uncork

Writable Stream 还有一个比较有趣的方法是 cork,即把出口塞住,此时水池的工作模式变为第二种(很显然,出口塞住了,只能先把水灌到池子里)。比如,下面这个例子:

const writeStream = fs.createWriteStream('./file')
writeStream.cork()
writeStream.on('open', () => {writeStream.write('a') // 不会写入到磁盘console.log(writeStream._writableState.buffered[0].chunk.toString()) // a
}) 

我们可以通过调用 uncork 重新打开出口,比如下面这个例子:

const writeStream = fs.createWriteStream('./file')
writeStream.cork()
writeStream.on('open', () => {writeStream.write('a') // 不会写入到磁盘console.log(writeStream._writableState.buffered[0].chunk.toString()) // asetTimeout(() => {writeStream.uncork() // 打开出口,水会流完console.log(writeStream._writableState.buffered[0]) // undefined}, 1000)
}) 

write 的返回值

write 函数是有返回值的,当返回 false 时,表示池子中的水位超过了 highWaterMark(16 KB),此时正确的做法应该停止继续往池子中注水,等待池子中的水排干了(即触发 drain 事件)再继续注水:

const writeStream = fs.createWriteStream('./file3')
const ret = writeStream.write(Array(20000).fill('a').join(''))if (!ret) {writeStream.on('drain', () => {writeStream.write(Array(1).fill('b').join(''))console.log(fs.readFileSync('./file2').length) // 20001})
} 

但是事实上你也可以什么都不管,一直注水:

const writeStream = fs.createWriteStream('./file3')
let i = 0
while (i++ < 999999) {writeStream.write(Array(999999).fill('a').join(''))
} 

这样的后果是机器内存会全部占满,注意这里不会受 Node.js 的运行内存限制,因为 write 的数据最后都会转为 Buffer

而且 Node.js 还会自动去刷 buffered 中的数据(这一块没有仔细研究,结论是通过实验得出的),比如上面的代码改成如下这样内存就不会一直增长了:

const writeStream = fs.createWriteStream('./file3')
let i = 0
const tId = setInterval(() => {writeStream.write(Array(20000).fill('a').join(''))console.log(writeStream._writableState.buffered.length)i++if (i > 999999) clearInterval(tId)
}) 

这里跟上面不同的是,write 是放在定时器中调用的,这就给了 Node.js 去刷数据的机会。

自定义 Writable Stream

同样的,自定义 Writable Stream 也有两种方式:

// 1
const writableStream = new Writable({write(chunk, encoding, callback) {console.log(chunk)callback()},
})
// 2
class CustomWritable extends Writable {_write(chunk, encoding, callback) {console.log(chunk)callback()}
}const writableStream = new CustomWritable() 

比如,我们可以写一个简单的写入文件的 Stream:

class FileWriter extends Writable {constructor(options) {super(options)this.fileName = options.fileName}_write(chunk, encoding, callback) {fs.appendFile(this.fileName, chunk, (err) => {if (err) return callback(err)callback()})}end(callback) {console.log(`Finished writing to file: ${this.fileName}`)callback()}
}const writableStream = new FileWriter({fileName: 'output.txt'})
writableStream.write('Data 1\n')
writableStream.write('Data 2\n')
writableStream.end(() => {console.log('Finished writing to stream')
}) 

以上就是 Writable Stream 的核心了,其他方法和事件比较简单,就不过多介绍了。

介绍完这两个东西,接下来我们把他们合起来再讨论讨论。

Readable Stream + Writable Stream

同时讨论这两个东西,最经典的莫过于 pipe 了,比如下面这个代码:

const fs = require('fs')const readStrem = fs.createReadStream('./file')
const writeStream = fs.createWriteStream('./out')readStrem.pipe(writeStream) 

起作用就相当于把水龙头和水池用一个管子连起来了:

这样,水就源源不断地从水源处流向目标了。

其原理也是监听了 Readable Streamdata 事件,获取到 chunk 写入 Writable Stream

src.on('data', ondata)
function ondata(chunk) {debug('ondata')const ret = dest.write(chunk)debug('dest.write', ret)if (ret === false) {pause()}
} 

不过,从代码中可以看到,pipe 还帮我们处理了当水源放水速度大于水池出水速度的场景。

这种场景下,某一个时刻,水池中的水会超过水池的最高水位线,此时 write 返回 false,水龙头会 pause

当水池中的水流完,会触发 drain,水龙头会 resume

Object Mode

Stream 默认只支持 String 和 Buffer,不过我们也可以改为使用 Object Mode。下面这个例子展示了如何开启 Object Mode:

const {Writable} = require('stream')class ObjectWriter extends Writable {constructor(options) {super({objectMode: true,...options,})}_write(chunk, encoding, callback) {console.log(`Received object: ${JSON.stringify(chunk)}`)callback()}
}const writer = new ObjectWriter()
writer.write({message: 'Hello, World!'})
writer.end() 

如果改成 false,则调用 write 的时候会报错:

TypeError [ERR_INVALID_ARG_TYPE]: The "chunk" argument must be of type string or an instance of Buffer or Uint8Array. Received an instance of Object 

到这里,Stream 的基础知识就介绍得差不多了,不过还有两个比较高级的 Stream 也不能漏了。

Duplex

Duplex 的意思是双重的,即一个 Stream 同时可用作 Readable StreamWritable Stream

下面这个例子实现了一个简单的 Duplex

const {Duplex} = require('stream')class UpperEchoStream extends Duplex {constructor(options) {super(options)this.buffer = []}_write(chunk, encoding, callback) {this.buffer.push(chunk)callback()}_read(size) {while (this.buffer.length > 0) {const chunk = this.buffer.shift()if (!this.push(chunk.toString().toUppercase())) {break}}}
}const echo = new UpperEchoStream()
echo.pipe(process.stdout)
echo.write('Hello, World!')
echo.end() 

上面的代码可以用下面这个图来表示:

其功能就是将输入转换成了大写,然后交给下一个 Writable Stream。其实对于这种对数据进行转换的功能,Node.js 还提供了一种特别的 Duplex,叫做 Tranform。还是上面的例子,用 Tranform 来实现更加简洁:

const {Transform} = require('stream')class UpperEchoStream extends Transform {_transform(chunk, encoding, callback) {callback(null, chunk.toString().toUpperCase())}
}const echo = new UpperEchoStream()
echo.pipe(process.stdout)
echo.write('Hello, World!')
echo.end() 

以上就是 Stream 的所有核心内容了,写文章太累了,头都要秃了!如果觉得有帮助的话,求点赞、收藏、转发,求关注公众号“前端游”!

补充内容

和 ChatGPT 讨论技术

## 最后 整理了一套《前端大厂面试宝典》,包含了HTML、CSS、JavaScript、HTTP、TCP协议、浏览器、VUE、React、数据结构和算法,一共201道面试题,并对每个问题作出了回答和解析。 **有需要的小伙伴,可以点击文末卡片领取这份文档,无偿分享**

部分文档展示:



文章篇幅有限,后面的内容就不一一展示了

有需要的小伙伴,可以点下方卡片免费领取

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/15447.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《不想放水》

原创&#xff1a;刘教链 * * * 由刘教链和ChatGPT共同填词。原曲&#xff1a;S.H.E.《不想长大》。 为什么就是找不到放水的证据呀 为什么救助的银行都不愿退市啊 我并不希望他拥有比特币和黄金 我惊讶的是假话竟然会变成谎话 为什么美元印钞机要转的那么快 为什么通胀和失业率…

存储器介绍

文章目录 存储系统基本概念存储器的层次存储器的分类存储器的性能指标 主存储器半导体元件的原理存储芯片的基本原理寻址 DRAM和SRAMDRAM的刷新DRAM的地址复用 只读存储器ROM主存储器和CPU的连接位扩展字扩展字位同时扩展补充 双端口RAM和多模块存储器双端口RAM解决多核CPU访存…

OpenAI文档翻译——在不通的场景下如何更好的设计ChatGPT提示词

概述 OpenAI可以被广泛的应用于各种任务&#xff0c;他为各种模型提供使用简单而功能强大的API。你可以输入一些文本作为提示词&#xff0c;OpenAI则会生成对应的提示词补全&#xff0c;在使用过程中这就是会话形式以及能够记住上下文的体现。探索如何生成提示词的最好方法就是…

如何使用ChatGPT 写官方声明?

上海车展宝马Mini展台被指区别对待中外访客&#xff0c;向外国访客送冰淇淋&#xff0c;中国访客索要时则说“没有”&#xff0c;此事引发争议。 对此&#xff0c;宝马官方也发布了官方致歉声明&#xff0c;网友看到声明后&#xff0c;纷纷发布自己的看法&#xff0c;有网友还…

独家专访:OpenAI 的 Sam Altman 谈 ChatGPT 以及通用人工智能如何“打破资本主义”

来源: AI前线 作为 OpenAI 的首席执行官&#xff0c;Sam Altman 领导的这家初创公司是正在快速增长的生成式 AI 行业中最热门且最受关注的一家。在 1 月中旬参观了 OpenAI 的旧金山办事处后&#xff0c;福布斯采访了这位最近不太愿意在媒体上抛头露面的投资者和企业家&#x…

ChatGPT is not all you need,一文看尽SOTA生成式AI模型:6大公司9大类别21个模型全回顾(三)

文章目录 ChatGPT is not all you need&#xff0c;一文看尽SOTA生成式AI模型&#xff1a;6大公司9大类别21个模型全回顾&#xff08;三&#xff09;Text-to-Text 模型ChatGPTLaMDAPEERMeta AI Speech from Brain Text-to-Code 模型CodexAlphacode Text-to-Science 模型Galacti…

千万级入口服务[Gateway]框架设计(三:分层模式)

本文将以技术调研模式编写&#xff0c;非技术同学可跳过。 文章目录 背景分层分发Handle分发hook分发并发分层 管道ChannelDemo 实现 小结 附录 背景 基于组件(插件)模式设计构建的入口服务实现中&#xff0c;使用 Go 原生包 plugin 的时候&#xff0c;会存在功能缺陷问题&am…

ChatGPT配合两款神器,1分钟生成流程图

流程图&#xff0c;工作上再正常不过的一种图形&#xff0c;常见制图方法对比&#xff1a; 传统手动制图&#xff1a;耗时耗力&#xff0c;迁移性差AI 辅助制图&#xff1a;使用自然语言提出需求&#xff0c;零基础快速制图 几款常见的我在用的在线绘图工具推荐&#xff1a; Pr…

面向对象实现游戏聊天中的敏感词屏蔽功能,将敏感词汇用星号***替换

import java.util.Scanner;//新建一个类 public class Replace {// 定义一个字符串String commons;// 建一个替换的方法public void replace() { //控制台输入Scanner sc new Scanner(System.in);// 提示输出&#xff08;为方便功能实现&#xff0c;此处提示内定的敏感词汇&am…

DreamGPT:让ChatGPT活在梦里!利用幻觉来激发创意灵感

本文来源 新智元 编辑&#xff1a;LRS 【新智元导读】语言模型幻觉从bug变feature&#xff01; ChatGPT最为人诟病的缺陷就是「胡编乱造」了&#xff0c;可以一本正经地讲一段林黛玉倒拔垂杨柳的故事。 对于真正想了解「林黛玉」或「倒拔垂杨柳」的人来说&#xff0c;这段回答…

这五个问题一下就看出阿里通义千问和ChatGPT的差距了

前言 阿里通义千问申请过了&#xff0c;为了看看达到了什么水平&#xff0c;于是我问题了5个ChatGPT回答过的问题1&#xff0c;这五个问题网上都是没有的&#xff0c;是我自己想出来的。 问题一:小明说今天他吃了一只公鸡蛋&#xff0c;请问小明诚实吗&#xff1f; ChatGPT 这…

chatgpt赋能python:Python模拟终端:打造简单易用的命令行工具

Python模拟终端&#xff1a;打造简单易用的命令行工具 在现代编程中&#xff0c;命令行工具被广泛使用。无论是开发、测试还是维护&#xff0c;都需要通过命令行工具完成。Python的灵活性和强大的库使得开发命令行工具变得更加容易。本文将介绍如何使用Python编写一个简单易用…

Llama 2高调开源颠覆大模型圈!2万亿token训练,打不过GPT3.5

​ 编辑 添加图片注释&#xff0c;不超过 140 字&#xff08;可选&#xff09; 来源 | 新智源 ID | AI-era 一觉醒来&#xff0c;Meta直接丢了一颗重磅核弹&#xff1a;Llama 2&#xff01; 继LLaMA开源后&#xff0c;Meta今天联手微软高调开源Llama 2&#xff0c;一共有7B、…

对话哥伦比亚大学教授俞舟:人工智能公司的竞争,到最后还是产品和服务的竞争...

视学算法报道 编辑&#xff1a;杨德泽 在对谈中&#xff0c;俞舟不像其他创业者那样大谈 ChatGPT 对于行业的巨大影响力&#xff0c;她更加关注产品、更加关注使用产品的人&#xff0c;她不相信 AI 可以替代人类&#xff0c;而是帮助人类提升技能。 在俞舟开始她的对话系统创业…

昨晚,爆了。

昨天晚上&#xff0c;我们微信群爆炸了。 因为星球就聊挣钱&#xff0c;首期小红书电商训练营开放报名&#xff0c;第一期我们准备开放了100个名额。 结果&#xff0c;不到3分钟就爆满了100人&#xff0c;再我们赶紧结束报名的时候&#xff0c;又涌入了40人。 这是我们就聊挣钱…

马斯克在中国的44小时丨多少值得看

ChatGPT狂飙160天&#xff0c;世界已经不是之前的样子。 新建了人工智能中文站https://ai.weoknow.com 每天给大家更新可用的国内可用chatGPT资源 马斯克中国行的高度神秘性背后&#xff0c;预示特斯拉对于此次行程成果的看重和期待。 作者丨 魏帅 任娅斐 来源丨中国企业家杂…

chatgpt赋能python:Python图像拼接代码:将多张图片合并为一张

Python图像拼接代码&#xff1a;将多张图片合并为一张 如果您需要将多个图片合并成一张大图&#xff0c;那么Python图像拼接代码可以帮助您简化这个过程。这篇文章将为您介绍如何使用Python的Pillow库来合并多个图片&#xff0c;并且展示了一些关于图像拼接常见的问题和技巧。…

开源一个各种USB电缆的测试仪,再也不用担心被只有充电功能的数据线坑了

作者&#xff1a;晓宇&#xff0c;排版&#xff1a;晓宇 微信公众号&#xff1a;芯片之家&#xff08;ID&#xff1a;chiphome-dy&#xff09; 01 想知道你的USB Type C数据线是USB2或者USB3吗&#xff1f;大家是否有一些仅有充电功能的USB数据线在你的抽屉里&#xff0c;等待…

OpenAI再出新作,AIGC时代,3D建模师的饭碗危险了!

大家好,我是千与千寻,也可以叫我千寻哥,说起来,自从ChatGPT发布之后,我就开始焦虑,担心自己程序员的饭碗会不会哪天就被AIGC取代了。 有人说我是过度焦虑了,但是我总觉有点危机感肯定没有坏处。(结尾反转,hhh,希望你看下去!) 不过好家伙,还没等AIGC大模型完全替…

python编写照片中人脸标识并保存功能

这是我之前回答中的代码的更新版本&#xff0c;它检测图像中的人脸并将检测到的人脸保存为单独的图像&#xff1a; 需要安装opencv-python pip3 install opencv-python -i https://pypi.douban.com/sample/ 还需要训练模型&#xff08;人脸识别文件&#xff09;&#xff1a;…