javascript基础从小白到高手系列二千六百五十四:使用ReadableStream 主体
因为可以使用ReadableStream 创建Response 对象,所以就可以在读取流之后,将其通过管道。要将Uint8Array 转换为可读文本,可以将缓冲区传给TextDecoder,返回转换后的值。缓冲区的大小,以及浏览器是否等待缓冲区被填充后才将其推到流中,要根据JavaScript 运行时的。在这些例子中,当读取完Uint8Array 块之后,浏览器会将其标记为可以被垃圾回收。从TCP
JavaScript 编程逻辑很多时候会将访问网络作为原子操作,比如请求是同时创建和发送的,响应数
据也是以统一的格式一次性暴露出来的。这种约定隐藏了底层的混乱,让涉及网络的代码变得很清晰。
从TCP/IP 角度来看,传输的数据是以分块形式抵达端点的,而且速度受到网速的限制。接收端点
会为此分配内存,并将收到的块写入内存。Fetch API 通过ReadableStream 支持在这些块到达时就实
时读取和操作这些数据。
正如Stream API所定义的,ReadableStream 暴露了getReader()方法,用于产生ReadableStream-
DefaultReader,这个读取器可以用于在数据到达时异步获取数据块。数据流的格式是Uint8Array。
下面的代码调用了读取器的read()方法,把最早可用的块打印了出来:
fetch(‘https://fetch.spec.whatwg.org/’)
.then((response) => response.body)
.then((body) => {
let reader = body.getReader();
console.log(reader); // ReadableStreamDefaultReader {}
reader.read()
.then(console.log);
});
// { value: Uint8Array{}, done: false }
在随着数据流的到来取得整个有效载荷,可以像下面这样递归调用read()方法:
fetch(‘https://fetch.spec.whatwg.org/’)
.then((response) => response.body)
.then((body) => {
let reader = body.getReader();
function processNextChunk({value, done}) {
if (done) {
return;
}
console.log(value);
return reader.read()
.then(processNextChunk);
}
return reader.read()
.then(processNextChunk);
});
// { value: Uint8Array{}, done: false }
// { value: Uint8Array{}, done: false }
// { value: Uint8Array{}, done: false }
// …
异步函数非常适合这样的fetch()操作。可以通过使用async/await 将上面的递归调用打平:
fetch(‘https://fetch.spec.whatwg.org/’)
.then((response) => response.body)
.then(async function(body) {
let reader = body.getReader();
while(true) {
let { value, done } = await reader.read();
if (done) {
break;
}
console.log(value);
}
});
// { value: Uint8Array{}, done: false }
// { value: Uint8Array{}, done: false }
// { value: Uint8Array{}, done: false }
// …
另外,read()方法也可以真接封装到Iterable 接口中。因此就可以在for-await-of 循环中方
便地实现这种转换:
fetch(‘https://fetch.spec.whatwg.org/’)
.then((response) => response.body)
.then(async function(body) {
let reader = body.getReader();
let asyncIterable = {
Symbol.asyncIterator {
return {
next() {
return reader.read();
}
};
}
};
for await (chunk of asyncIterable) {
console.log(chunk);
}
});
// { value: Uint8Array{}, done: false }
// { value: Uint8Array{}, done: false }
// { value: Uint8Array{}, done: false }
// …
通过将异步逻辑包装到一个生成器函数中,还可以进一步简化代码。而且,这个实现通过支持只读
取部分流也变得更稳健。如果流因为耗尽或错误而终止,读取器会释放锁,以允许不同的流读取器继续
操作:
async function* streamGenerator(stream) {
const reader = stream.getReader();
try {
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
yield value;
}
} finally {
reader.releaseLock();
}
}
fetch(‘https://fetch.spec.whatwg.org/’)
.then((response) => response.body)
.then(async function(body) {
for await (chunk of streamGenerator(body)) {
console.log(chunk);
}
});
在这些例子中,当读取完Uint8Array 块之后,浏览器会将其标记为可以被垃圾回收。对于需要在
不连续的内存中连续检查大量数据的情况,这样可以节省很多内存空间。
缓冲区的大小,以及浏览器是否等待缓冲区被填充后才将其推到流中,要根据JavaScript 运行时的
实现。浏览器会控制等待分配的缓冲区被填满,同时会尽快将缓冲区数据(有时候可能未填充数据)发
送到流。
不同浏览器中分块大小可能不同,这取决于带宽和网络延迟。此外,浏览器如果决定不等待网络,
也可以将部分填充的缓冲区发送到流。最终,我们的代码要准备好处理以下情况:
不同大小的Uint8Array 块;
部分填充的Uint8Array 块;
块到达的时间间隔不确定。
默认情况下,块是以Uint8Array 格式抵达的。因为块的分割不会考虑编码,所以会出现某些值作
为多字节字符被分散到两个连续块中的情况。手动处理这些情况是很麻烦的,但很多时候可以使用
Encoding API 的可插拔方案。
要将Uint8Array 转换为可读文本,可以将缓冲区传给TextDecoder,返回转换后的值。通过设
置stream: true,可以将之前的缓冲区保留在内存,从而让跨越两个块的内容能够被正确解码:
let decoder = new TextDecoder();
async function* streamGenerator(stream) {
const reader = stream.getReader();
try {
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
yield value;
}
} finally {
reader.releaseLock();
}
}
fetch(‘https://fetch.spec.whatwg.org/’)
.then((response) => response.body)
.then(async function(body) {
for await (chunk of streamGenerator(body)) {
console.log(decoder.decode(chunk, { stream: true }));
}
});
// <!doctype html> …
// whether a <a data-link-type=“dfn” href=“#concept-header” …
// result to rangeValue. …
// …
因为可以使用ReadableStream 创建Response 对象,所以就可以在读取流之后,将其通过管道
导入另一个流。然后在这个新流上再使用Body 的方法,如text()。这样就可以随着流的到达实时检
查和操作流内容。下面的代码展示了这种双流技术:
fetch(‘https://fetch.spec.whatwg.org/’)
.then((response) => response.body)
.then((body) => {
const reader = body.getReader();
// 创建第二个流
return new ReadableStream({
async start(controller) {
try {
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
// 将主体流的块推到第二个流
controller.enqueue(value);
}
} finally {
controller.close();
reader.releaseLock();
}
}
})
})
.then((secondaryStream) => new Response(secondaryStream))
.then(response => response.text())
.then(console.log);
// <!doctype html> …
更多推荐
所有评论(0)