文章目录
- nodejs架构
- 异步非阻塞IO 事件驱动 单线程
- global 对象
- path
- buffer
- fs
- fs处理文件打开和关闭
- fs使用buffer进行文件操作
- fs大文件传输和写入
- fs操作目录
- fs创建目录、删除目录
- 组件化前端开发
- commonjs
- module
- module.exports 和 exports 区别
- 模块加载流程
- vm模块
- 模块加载函数实现
- 事件模块
- EventEmitter实现
- 浏览器事件环执行顺序
- Nodejs事件环
- Stream模块
- 可读流
- 可写流
- 理解writeable的缓冲机制
- 理解drain事件的作用
- 模拟可读流及pipe的实现
- 单向链表的实现,模拟可写流实现
- 通信基本原理
- 创建tcp通信
- 基于封包拆包解决粘包问题
- http协议
nodejs架构
nodejs api的测试,是基于v16.20.2版本
异步非阻塞IO 事件驱动 单线程
- nodejs主线程是单线程
- 网络io/libuv适应当前平台
- 非网络io、非io线程池中的调度操作
global 对象
- js文件中 this默认为undefined
- __dirname
- __filename
- module
- exports
- require
- performance
path
import path from 'node:path';
import {
dirname
} from "node:path"
import {
fileURLToPath
} from "node:url"
const __filename = fileURLToPath(
import.meta.url);
const __dirname = dirname(__filename);
// 获取路径中最后名称, 第二个参数用于判断是否匹配扩展名,匹配就返回名字,不匹配返回完整名称
console.log(path.basename(__filename)); // test.js
console.log(path.basename(__filename, 'js')); // test
console.log(path.basename(__filename, 'css')); // test.js
console.log(path.basename('/a/b/c')); // c
console.log(path.basename('/a/b/c/')); // c
// 获取路径中目录
console.log(path.dirname(__filename)); // /Volumes/D2/project/template/iwantit/test
console.log(path.dirname('/a/b/c/')); // /a/b
// 获取路径中扩展名称
console.log(path.extname(__filename));
// 获取路径是否是绝对路径
console.log(path.isAbsolute(__filename));
// 拼接多个路径
console.log(path.join('/a', 'b')); // /a/b
// 返回绝对路径
// resolve([from], to)
console.log(path.resolve('./ds/d.html')); // /Volumes/D2/project/template/iwantit/test/ds/d.html
// 解析路径
console.log(path.parse(__filename));
// {
// root: '/',
// dir: '/Volumes/D2/project/template/iwantit/test',
// base: 'test.js',
// ext: '.js',
// name: 'test'
// }
// 序列化路径
console.log(path.format({
a: __filename,
b: "d"
}));
// 规范化路径 处理平台差异
console.log(path.normalize(__filename));
buffer
- IO行为操作的就是二进制数据
- buffer实现nodejs平台下的二进制数据操作
- stream操作配合管道实现数据分段,处理大型文件
- 数据的端到端传输会产生生产者和消费者
- 生产速度和消费速度不一致,往往存在等待
- buffer就是处理等待的数据的,属于缓冲区
- buffer无需require
- buffer不占据v8堆内存大小的使用空间
- buffer内存使用是node控制,回收 是v8负责
- buffer一般配合stream流使用,充当数据缓冲区
nodejs打印buffer返回的是16进制格式,16进制可用4位二进制表示,两个16进制共占用8位,也就是一个字节
// fill 依据传入的数据重复进行填写
let b1 = Buffer.alloc(6);
b1.fill(123, 1, 5); // 从1写到5
console.log(b1); // <Buffer 00 7b 7b 7b 7b 00>
console.log(b1.toString()); // {{{{
// write
let b1 = Buffer.alloc(6);
b1.write("123", 3, 2) // 从3开始写,写入长度为2
console.log(b1); // <Buffer 00 00 00 31 32 00>
console.log(b1.toString()); // 12
// toString 第二个参数表示从哪里开始读 第三个参数表示从哪里结束
let b1 = Buffer.from("乐观向上");
console.log(b1);
console.log(b1.toString('utf-8', 3)); // 观向上
// slice
let b1 = Buffer.from("乐观向上");
let b2 = b1.slice(3, 9);
console.log(b2);
console.log(b2.toString()); // 观向
// indexOf
let b1 = Buffer.from("乐观向上");
console.log(b1);
console.log(b1.indexOf('向')); // 6
// copy
let b1 = Buffer.alloc(6);
let b2 = Buffer.from('晴天');
b2.copy(b1, 3, 3, 6); // 写入对象, 从数据源哪个位置开始, 写入起始位置,写入结束位置
console.log(b1.toString()); // 天
console.log(b2.toString()); // 晴天
b2.copy(b1);
console.log(b1.toString()); // 晴天
// concat
let b1 = Buffer.from('很远很远的');
let b2 = Buffer.from('距离');
let a = Buffer.concat([b1, b2], 9);
console.log(a.toString()) // 很远很
let b = Buffer.concat([b1, b2]);
console.log(b.toString()) // 很远很远的距离
let bol = Buffer.isBuffer(b); // true
// split
Buffer.prototype.split = function(sep) {
let len = Buffer.from(sep).length;
let res = [];
let start = 0;
let offset = 0;
while ((offset = this.indexOf(sep, start)) !== -1) {
res.push(this.slice(start, offset))
start = offset + len;
}
res.push(this.slice(start))
return res;
}
let b1 = Buffer.from("乐观向上");
console.log(b1.split("观").map(v => v.toString()));
fs
-
fs基本操作类
- 权限位
- 用户对于文件所具备的操作权限
- 读 r 4
- 写 w 2
- 执行 x 1
- 无权限 0
-
drwxr-xr-x 第一位为d代表是文件夹
-
-rw-r—r— 第一位为-代表是文件, 每3位为一组
-
标识符
- r 表示可读
- w 表示可写
- s 表示同步
- - 表示执行相反的操作
- r+ 以读写模式打开文件。如果文件不存在,则会抛出错误。
- x 表示排它操作
- a 表示追加操作
-
文件描述符 fd
- 0、1、2 标准输入、标准输出、标准错误
- fs.open调用后会有个fd,从3开始
-
fs常用api
- readFile 从指定文件中读取数据
- writeFile 从指定文件中写入数据
- appendFile 追加的方式向指定文件中写入数据
- copyFile 将某个文件中的数据拷贝至另一个文件
- watchFile 对指定文件进行监控
fs.readFile(path.resolve('txt.txt'), 'utf-8', (err, data) => {
if (!err) {
console.log(data);
fs.writeFile(path.resolve('txt.txt'),
'ok', {
mode: 438, // 八进制0o666 6*64+6*8+6 = 438 表示可读写,但不可执行
flag: 'r+' // 用来控制如何读写,r+代表不清空写入
},
(err) => {
console.log(err);
})
} else {
console.log(err);
}
})
// 追加内容
fs.appendFile('txt.txt', '乐观向上', (err) => {
console.log('写入成功');
})
fs.copyFile('txt.txt', 'copy-txt.txt', (err) => {
console.log('复制成功');
})
import path from 'node:path';
import fs from 'node:fs';
import browserSync from 'browser-sync';
import {
dirname
} from "node:path"
import {
fileURLToPath
} from "node:url"
import {
marked
} from 'marked';
const __filename = fileURLToPath(
import.meta.url);
const __dirname = dirname(__filename);
const template = `
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Document</title>
<style>{{style}}</style>
</head>
<body>
{{content}}
</body>
</html>
`
let mdPath = path.resolve(__dirname, './index.md');
let cssPath = path.resolve(__dirname, './index.css');
let basename = path.basename(mdPath);
let extname = path.extname(basename);
let htmlPath = path.resolve(__dirname, basename.replace(extname, '.html'));
fs.watchFile(mdPath, (curr, prev) => {
if (curr.mtime !== prev.mtime) {
fs.readFile(mdPath, (err, data) => {
if (!err) {
const mdContent = marked.parse(data.toString());
let contentTemp = template.replace('{{content}}', mdContent);
fs.readFile(cssPath, (err, data) => {
if (!err) {
const cssStyle = data.toString();
contentTemp = contentTemp.replace('{{style}}', cssStyle);
};
fs.writeFile(htmlPath, Buffer.from(contentTemp), (err) => {
if (!err) {
console.log('文件生成成功');
}
})
})
}
})
}
})
browserSync.init({
browser: '',
server: __dirname,
open: true,
watch: true
})
fs处理文件打开和关闭
文件描述符的使用,在同一个进程中,文件关闭的时候,文件描述符会复用。
fs.open('./index.css', 'r', (err, fd) => {
console.log('css', fd);
})
fs.open('./index.html', 'r', (err, fd) => {
console.log('html', fd);
fs.close(fd, (err) => {
console.log('html 关闭成功');
fs.open('./index.js', 'r', (err, fd) => {
console.log('js', fd);
})
})
})
// css 20
// html 21
// html 关闭成功
// js 21
fs使用buffer进行文件操作
为啥要自己在buffer中进行读写?目的就是为了控制读取
和写入速度
,如果全交给nodejs来做,速度无法控制,可能会导致数据出现问题
let buf = Buffer.alloc(10);
/**
* 以下参数使用于 read和write方法
*
* fd 定位当前被打开的文件
* buf 用于表示当前缓冲区
* offset 表示当前从 buf 的哪个位置开始执行写入
* length 表示当前次写入的长度
* position 表示当前从文件的哪个位置开始读取
*/
// read 读操作就是讲数据从磁盘文件写入到buffer中
fs.open('data.txt', 'r', (err, rfd) => {
fs.read(rfd, buf, 1, 4, 3, (err, readBytes, data) => {
console.log(readBytes);
console.log(data.toString());
})
})
// write 将缓冲区的数据写入到磁盘文件当中
buf = Buffer.from("1234567890")
fs.open('back.txt', 'r+', (err, wfd) => {
fs.write(wfd, buf, 0, 3, 2, (err, writtenBytes, buffer) => {
console.log(writtenBytes);
console.log(buffer.toString());
fs.close(wfd)
})
})
fs大文件传输和写入
/**
* 1. 打开a文件,用read写入buffer
* 2. 打开b文件,用write将buffer写入b中
*/
let bufferSize = 10;
let buf = Buffer.alloc(bufferSize);
let offset = 0;
fs.open('a.txt', 'r', (err, rfd) => {
fs.open('b.txt', 'a+', (err, wfd) => {
function next() {
fs.read(rfd, buf, 0, bufferSize, offset, (err, readBytes, data) => {
if (!readBytes) {
fs.close(rfd);
fs.close(wfd);
return
}
fs.write(wfd, buf, 0, readBytes, offset, (err, writtenBytes) => {
console.log('写入成功');
offset += readBytes;
next();
})
})
}
next();
})
})
fs操作目录
fs.access('data.txt', (err) => !err && (console.log('有权限')))
fs.stat('data.txt', (err, statObj) => {
console.log(statObj.size);
console.log(statObj.isFile());
console.log(statObj.isDirectory());
})
// 默认非递归创建
fs.mkdir('a/b/c', {
recursive: true
}, (err) => {
if (err) {
console.log('创建失败');
} else {
console.log('创建成功');
}
})
// 默认只能删除空目录 递归为true可以删除非空目录
fs.rmdir('a/b/c', {
recursive: true
}, (err) => {
if (err) {
console.log('删除失败');
} else {
console.log('删除成功');
}
})
// 读取目录
fs.readdir('node_modules/.bin', (err, files) => {
console.log(files);
})
// unlink
fs.unlink('a.txt', (err) => {
console.log(err);
if (err) {
console.log('删除失败');
} else {
console.log('删除成功');
}
})
fs创建目录、删除目录
/**
* 1. 将来调用时需要接受类似 a/b/c,路径以 / 连接
* 2. 利用 / 进行分割,然后生成数组 ['a', 'b', 'c']
* 3. 对上述数组进行便利,我们需要拿到每一项,然后与前一项进行拼接
* 4. 判断拼接的路径是否可操作,有则存在,否则需创建
*/
function makeDirSync(dirPath) {
let items = dirPath.split(path.sep)
for (let i = 1; i <= items.length; i++) {
let dir = items.slice(0, i).join(path.sep)
try {
fs.accessSync(dir)
} catch (error) {
fs.mkdirSync(dir)
}
}
}
makeDirSync('a/b/c')
// 删除目录
async function mkDirAsync(dirPath, cb) {
const items = dirPath.split(path.sep);
for (let i = 1; i <= items.length; i++) {
const dir = items.slice(0, i).join(path.sep);
try {
await access(dir);
} catch (error) {
await mkdir(dir);
}
}
cb && cb()
}
mkDirAsync('a/b/c')
function rmDirAsync(dirPath, cb) {
// 判断path类型
fs.stat(dirPath, (err, statObj) => {
if (statObj.isDirectory()) {
fs.readdir(dirPath, (err, files) => {
console.log(files);
const dirs = files.map(item => {
return path.join(dirPath, item)
})
let i = 0;
function next() {
if (i >= dirs.length) return fs.rmdir(dirPath, cb);
rmDirAsync(dirs[i++], next)
}
next();
})
} else {
fs.unlink(dirPath, cb)
}
})
}
rmDirAsync('a/b', () => {
console.log('删除成功了');
})
组件化前端开发
- 命名冲突和污染
- 代码冗余,无效请求多
- 文件依赖关系复杂
commonjs
- 任意js文件就是一个模块,具有独立作用于
- 使用require导入其他模块
- 将模块ID传入require实现目标模块定位
- 模块加载都是同步的
module
- 任意js文件就是一个模块 可使用module属性
- id 模块标识符,一般是一个绝对路径
- filename 返回文件模块的绝对路径
- loaded 返回布尔值 表示模块是否完成架子啊
- parent 返回对象存放调用当前模块的模块
- children 返回数组,存放当前模块调用的其他模块
- exports 返回当前模块需要暴露的内容
- paths 返回数组 存放不同目录下的node_modules位置
module.exports 和 exports 区别
- 两者在引用上默认是一致的
- exports只可追加属性,不可直接赋值
- require 读入并执行一个模块文件,返回exports对象
- resolve 返回模块文件绝对路径
- extensions 依据后缀名执行解析操作
- main 返回主入口对象,脚本执行的那个文件即为主对象
模块加载流程
-
核心模块:node源码编译时写入到二进制文件中
-
文件模块:代码运行时,动态加载
-
加载流程
-
路径分析: 依据标识符确定模块位置
- 路径标识符
- 非路径标识符常见于核心模块
- 模块路径是根据当前目录开始向上查找每个node_modules,直到根目录
-
文件定位: 确定目标模块中具体的文件及文件类型
- require处理没有扩展名的路径,会默认按 js、json、node的顺序,先查找是否又对应的模块文件
- 如果以上还没找到,就会把路径当成一个npm包,然后查找package.json,使用JSON.parse()解析,取出main属性值
- main属性也没扩展名,也会默认按 js、json、node的顺序继续查找
- 如果以上还没找到,将index作为目标模块中的具体文件名称
- main属性也没扩展名,也会默认按 js、json、node的顺序继续查找
- 如果以上还没找到,就会把路径当成一个npm包,然后查找package.json,使用JSON.parse()解析,取出main属性值
- require处理没有扩展名的路径,会默认按 js、json、node的顺序,先查找是否又对应的模块文件
-
编译执行:采用对应的方式完成文件的编译执行
-
将某个具体类型的文件按照相应的方式进行编译和执行
-
创建新对象,按路径载入,完成编译执行
-
js文件执行
- 使用fs同步读入目标文件内容
- 对内容进行语法包装,生成可执行的js函数
- 调用函数时传入exports、module、require等属性
-
json文件是直接使用JSON.parse进行解析
-
缓存优先原则
-
-
vm模块
1. runInThisContext
// test.txt
var ttt = 'test vm'
// test.js
import vm from 'node:vm';
import fs from 'node:fs';
let content = fs.readFileSync('./test.txt', 'utf-8');
vm.runInThisContext(content);
console.log(ttt);
console.log(global.ttt); // test.txt的内容运行后,ttt被设置在全局对象上了
// test vm
// test vm
let localVar = 'initial value';
const vmResult = vm.runInThisContext('localVar = "vm";');
console.log(`vmResult: '${vmResult}', localVar: '${localVar}'`);
// vmResult: 'vm', localVar: 'initial value'
2. runInContext
const vm = require('node:vm');
const contextObject = {
globalVar: 1
};
vm.createContext(contextObject);
for (let i = 0; i < 10; ++i) {
vm.runInContext('globalVar *= 2;', contextObject);
}
console.log(contextObject);
// Prints: { globalVar: 1024 }
模块加载函数实现
// 路径分析
// 缓存优化
// 文件定位
// 编译执行
事件模块
-
nodejs是基于事件驱动的异步操作架构
-
内置events模块提供 EventEmitter 类
-
nodejs内置模块多数内置集成 EventEmitter
-
EventEmitter.on 相同事件,按注册顺序依次执行
-
EventEmitter.emit
-
EventEmitter.once
-
EventEmitter.off
EventEmitter实现
// on
// emit
// once
// off
浏览器事件环执行顺序
- 从上至下执行所有的同步代码
- 执行过程中将遇到的宏任务和微任务添加在相应的队列中
- 同步代码执行完毕后,执行满足条件的为任务回调
Nodejs事件环
-
宏任务队列顺序
- check: 执行setImmediate中的回调
- timers: 执行setTimout与setInterval回调
- pending callbacks: 执行系统操作的回调,例如 tcp和udp
- idle.prepare: 只在系统内部进行使用
- poll: 执行I/O相关的调用
- close callbacks: 执行close事件的回调
-
流程
- 执行同步代码,将不同的任务添加至相应的队列
- 所有同步执行后会执行满足条件的微任务
- 所有微任务执行后会执行check队列中满足的宏任务
- check中的宏任务执行完成后就会依次切换队列
- 注意:在每个宏任务完成后都会先清空微任务队列,如果微任务耗时较长,会阻塞队列
Stream模块
-
nodejs中的流就是处理流式数据的抽象接口
-
ls | grep *.js
-
流解决的常见问题
- 同步读取资源文件,用户需等待数据读取完成
- 资源文件一次性加载至内存,开销较大
-
流处理数据的优势
- 时间效率:流的分段处理可以同时操作多个数据chunk
- 空间效率:同一时间流无须占据大内存
- 使用方便:流配合管理,程序扩展变得更简单
Nodejs的流分类
- Readable: 可读流
- Writeable:可写流
Node.js 的可读流(Readable Stream)内部通过复杂的状态管理和缓冲区机制实现了高效的数据处理。以下是其核心原理和关键问题的解析: 一、可读流内部数据管理机制
-
缓冲区的创建与数据插入
• 可读流通过_read
方法从底层数据源(如文件、网络)读取数据,并调用push
方法将数据块插入内部缓冲区。 • 缓冲区使用链表结构(如BufferList
)实现,支持高效的数据插入和移除操作。链表每个节点存储一个数据块(Buffer 或字符串),通过highWaterMark
参数控制缓冲区最大容量。 • 当调用push
时,数据会被追加到链表尾部,并更新length
属性记录当前缓冲区总字节数。若缓冲区未满(未达highWaterMark
),流会继续触发_read
以加载更多数据。 -
数据读取的触发条件
• 流动模式(Flowing Mode):通过监听data
事件或调用pipe
方法,流会自动从缓冲区中拉取数据并触发事件。此时数据会被连续消费,直到缓冲区为空或达到highWaterMark
限制。 • 暂停模式(Paused Mode):通过监听readable
事件,需手动调用read()
方法逐块读取数据。每次读取后,流会根据剩余缓冲区空间决定是否触发新的_read
调用。
二、 readable
与 data
事件的切换原理
-
模式切换的触发方式
• 切换到流动模式:◦ 添加
data
事件监听器; ◦ 调用resume()
方法; ◦ 使用pipe
方法连接可写流。
• 切换回暂停模式:
◦ 移除所有 `data` 事件监听器并调用 `pause()` ;
◦ 调用 `unpipe()` 断开管道。
2. 底层状态管理
• 可读流通过内部状态对象 ReadableState
跟踪模式变化,关键字段包括:
◦ `flowing` :标识当前模式( `null` 初始态, `true` 流动模式, `false` 暂停模式);
◦ `needReadable` :标记是否需要触发 `readable` 事件;
◦ `reading` :标识是否正在从底层读取数据。
• 示例流程:当 data
事件触发时,流会进入流动模式,循环调用 read()
清空缓冲区;若缓冲区数据量低于 highWaterMark
,则触发 _read
填充新数据。
三、关键设计细节
-
背压控制(Backpressure)
• 当可写流处理速度慢于可读流时,缓冲区达到highWaterMark
,流会暂停_read
调用,防止内存溢出。 • 通过writable.write()
返回的布尔值判断下游处理能力,动态调整数据生产节奏。 -
事件循环与异步处理
• 数据读取和事件触发通过 Node.js 事件循环异步调度。例如,在流动模式中,data
事件可能批量触发以减少上下文切换开销。
- Duplex:双工流,可读写
- Transform:转换流,可读写,可数据转换
import {
Duplex,
Transform,
} from 'node:stream'
import fs from 'fs'
class MyTransform extends Transform {
constructor() {
super()
}
_transform(chunk, encoding, next) {
this.push(chunk.toString().toUpperCase())
next()
}
}
const myTransform = new MyTransform()
myTransform.write("d")
myTransform.write("e")
myTransform.write("f")
myTransform.on('data', (chunk) => console.log(`Received chunk: ${chunk}`))
// Received chunk: D
// Received chunk: E
// Received chunk: F
可读流
let rs = fs.createReadStream('./test.txt', {
flags: 'r',
encoding: null,
fd: null,
mode: 0o666,
autoClose: true,
start: 0,
// end: 13,
highWaterMark: 5, // 控制底层数据分片大小(每次读取3字节)
})
rs.on('data', (chunk) => {
console.log(chunk.toString())
rs.pause()
setTimeout(() => {
rs.resume()
}, 1000)
})
rs.on('readable', () => {
console.log('触发readable事件,当前缓冲区长度:', rs._readableState.length);
let chunk
while ((chunk = rs.read(3)) !== null) {
console.log('读取到数据:', chunk.toString());
console.log('读取后缓冲区长度:', rs._readableState.length);
}
// 触发readable事件,当前缓冲区长度: 5
// 读取到数据: 012
// 读取后缓冲区长度: 2
// 触发readable事件,当前缓冲区长度: 7
// 读取到数据: 345
// 读取后缓冲区长度: 4
// 读取到数据: 678
// 读取后缓冲区长度: 1
// 触发readable事件,当前缓冲区长度: 1
// 读取到数据: 9
// 读取后缓冲区长度: 0
})
rs.on('open', () => {
console.log('文件打开成功')
})
rs.on('close', () => {
// 可读流只要读了数据,就会触发close事件
console.log('文件关闭')
})
rs.on('end', () => {
// 缓冲区中没有数据时,才会触发end事件
// 适合在这里来整合数据,比如写入到数据库等操作
console.log('读取结束')
})
rs.on('error', (err) => {
console.log('读取错误', err)
})
可写流
import fs from 'fs'
const ws = fs.createWriteStream('output.txt', {
flags: 'w',
fd: null,
autoClose: true,
mode: 0o666,
encoding: 'utf8',
start: 0,
highWaterMark: 4
})
ws.write('hello ', () => {
console.log('写入完成1')
})
ws.write('world', () => {
console.log('写入完成2')
})
ws.on('open', () => {
console.log('文件打开了')
})
ws.on('error', () => {
console.log('xxxxxx错误发生')
})
// Calling the writable.end() method signals that no more data will be written to the Writable
ws.end('!')
// 文件关闭事件,需要触发end事件才会触发close事件
ws.on('close', () => {
console.log('文件关闭了')
})
理解writeable的缓冲机制
import fs from 'node:fs'
const ws = fs.createWriteStream('./output.txt', {
highWaterMark: 4
})
async function write(chunk) {
// write执行流程
let flag = ws.write('1')
console.log(flag) // true
flag = ws.write('2')
console.log(flag) // true
// flag为false,不代表写入失败,而是表示缓冲区已满,需要等待drain事件触发后再继续写入
flag = ws.write('3')
console.log(flag) // false
// 以上同步代码的写入操作,会将数据放入缓冲区,实际写入操作回调,被放入当前宏任务的微任务队列中
const p1 = new Promise((resolve) => {
setTimeout(() => {
// 2s后写入数据4到文件中
ws.write('4')
resolve()
}, 2000)
})
// 4的写入是另一个包含宏任务的微任务
await p1
// 但对于另一个宏任务中的写入操作,会在3秒后第二次写入数据5到文件中
setTimeout(() => {
// 因为await了2s,所有是4s后写入数据5到文件中
ws.write('5')
}, 2000)
// 监听drain事件,当缓冲区再次可用时,触发该事件
ws.on('drain', () => {
console.log('drain')
})
}
write()
// [email protected]版本
// 1. 创建可写流,设置高水位线为4,流对象初始化缓冲区及参数,注册流销毁及错误事件回调,保证
// 2. 每次调用write方法,将数据写入到缓冲区
// 3. 生产速度一般远大于消费速度,缓冲区会逐渐被填满
// 4. 第三次调用write方法,flag返回false,缓冲区已到达警戒线,需要手动限制继续写入数据到缓冲区,强行写入是被允许的,但是不推荐这么做
// 5. 生产者监听drain事件,当缓冲区再次可用时,触发drain事件,继续写入数据
// 6. 每完成一次宏任务,会执行一次写入操作
理解drain事件的作用
import fs from 'node:fs'
import {
truncate
} from 'node:fs/promises'
const ws = fs.createWriteStream('./output.txt', {
highWaterMark: 3
})
const data = '我很努力'
// ws.write(data) // 一次性写入,可能会有性能问题
let flag = true
let index = 0
function write() {
flag = true
while (flag && index < data.length) {
flag = ws.write(data[index])
index++
}
}
write()
ws.on('drain', () => {
console.log('drain triggered'); // 触发4次
flag = true
write()
})
模拟可读流及pipe的实现
import fs from 'node:fs'
import EventEmitter from 'node:events'
class MyReadStream extends EventEmitter {
constructor(options) {
super(options)
this.path = options.path
this.flag = options.flag || 'r'
this.mode = options.mode || 0o666
this.autoClose = options.autoClose || true
this.start = options.start || 0
this.end = options.end || null
this.highWaterMark = options.highWaterMark || 64 * 1024
this.fd = null
this.readOffset = 0
this.open()
this.on('newListener', (event, listener) => {
if (event === 'data') {
this.read()
}
})
this.drained = false
}
open() {
fs.open(this.path, this.flag, this.mode, (err, fd) => {
if (err) {
this.emit('error', err)
} else {
this.fd = fd
this.emit('open', this.fd)
}
})
}
read() {
if (this.drained) return
if (this.fd === null) return this.once('open', () => this.read())
let buf = Buffer.alloc(this.highWaterMark)
let howMuchToRead = this.end ? Math.min(this.end - this.start + 1, this.highWaterMark) : this.highWaterMark
fs.read(this.fd, buf, this.start, howMuchToRead, this.readOffset, (err, bytesRead) => {
if (err) {
this.emit('error', err)
} else if (bytesRead === 0) {
this.emit('end')
} else {
let data = buf.slice(0, bytesRead)
this.readOffset += bytesRead
this.emit('data', data)
process.nextTick(() => this.read())
}
})
}
close() {
if (this.fd === null) return this.once('open', () => this.close())
fs.close(this.fd, err => {
if (err) {
this.emit('error', err)
} else {
this.fd = null
this.emit('close')
}
})
}
pipe(ws) {
this.on('data', (chunk) => {
const ret = ws.write(chunk)
console.log(ret);
if (!ret) this.pause()
})
ws.on('drain', () => {
this.resume()
})
}
pause() {
this.drained = true
}
resume() {
this.drained = false;
this.read()
}
}
const rs = new MyReadStream({
path: './output.txt',
highWaterMark: 3
})
const ws = fs.createWriteStream('./output2.txt', {
highWaterMark: 1
})
rs.on('data', (chunk) => {
console.log(chunk.toString())
})
rs.on('end', () => {
console.log('文件读取完毕')
})
rs.pipe(ws)
单向链表的实现,模拟可写流实现
// linkList.ts
class LinkNode<T> {
data: T
next: LinkNode<T> | null = null
constructor(data: T, next?: LinkNode<T> | null) {
this.data = data
this.next = next || null
}
}
class LinkList<T> {
head: LinkNode<T> | null = null
size: number = 0
constructor() { }
validateIndex(index: number, checkRange: boolean = true) {
if (index < 0) {
throw new Error('Index must be greater than or equal to 0');
}
if (checkRange && index >= this.size) {
throw new Error('Index must be less than the size of the list');
}
return true;
}
private _get_node(index: number): LinkNode<T> {
let curNode = this.head!
for (let curIndex = 0; curIndex < index; curIndex++) {
curNode = curNode.next!
}
return curNode
}
add(content: T)
add(index: number, content: T)
add(indexOrContent: number | T, content?: T) {
if (typeof indexOrContent !== 'number') {
return this.add(0, indexOrContent);
}
const index = indexOrContent;
this.validateIndex(index, false);
let target: LinkNode<T> | null = null
if (index === 0) {
target = new LinkNode<T>(content!, this.head)
this.head = target
} else {
const prev = this._get_node(index - 1)
target = new LinkNode<T>(content!, prev.next)
prev.next = target
}
this.size++
return target
}
remove(index: number): LinkNode<T> | null {
this.validateIndex(index)
let rmTarget: LinkNode<T> | null = null
if (index === 0) {
rmTarget = this.head
this.head = this.head!.next
} else {
const prev = this._get_node(index - 1)
rmTarget = prev!.next
if (rmTarget === null) return null
prev!.next = rmTarget!.next
}
this.size--
return rmTarget
}
set(index: number, content: T) {
this.validateIndex(index)
let target: LinkNode<T> = this._get_node(index)
target.data = content
return target
}
get(index: number): LinkNode<T> | null {
this.validateIndex(index)
return this._get_node(index)
}
clear() {
this.head = null
this.size = 0;
}
}
export default LinkList
// queue.ts
import LinkList from "./linkList";
class Queue<T> {
queue: LinkList<T>;
constructor() {
this.queue = new LinkList()
}
enQueue(item: T) {
this.queue.add(item)
}
deQueue() {
if(this.isEmpty()) return null
return this.queue.remove(0)
}
isEmpty() {
return this.queue.size === 0
}
clear(){
this.queue.clear()
}
}
export default Queue
// writeStream.ts
import fs from 'fs'
import EventsEmitter from 'events'
import Queue from './queue'
interface WriteBuffer {
chunk: ArrayBuffer
encoding: string
cb: Function
}
class WriteStream extends EventsEmitter {
flags: string
mode: number
fd: number | null
autoClose: any
hightWaterMark: any
start: any
encoding: any
queue: Queue<WriteBuffer>
writing: boolean
writeLen: number
needDrain: boolean
writeOffset: number
constructor(private path, options: any) {
super()
this.path = path
this.flags = options.flags || 'w'
this.mode = options.mode || 0o666
this.autoClose = options.autoClose || true
this.hightWaterMark = options.highWaterMark || 16 * 1024
this.start = options.start || 0
this.encoding = options.encoding || 'utf8'
this.open()
this.queue = new Queue()
this.writing = false
this.writeLen = 0
this.writeOffset = 0
this.needDrain = false
}
open() {
fs.open(this.path, this.flags, (err, fd) => {
if (err) {
this.emit('error', err)
}
this.fd = fd
this.emit('open', fd)
})
}
write(chunk: ArrayBuffer | string, encoding: string, cb: Function) {
chunk = typeof chunk === 'string' ? Buffer.from(chunk, this.encoding) : chunk
this.writeLen += chunk.byteLength
let flag = this.writeLen < this.hightWaterMark
this.needDrain = !flag
if (this.writing) {
this.queue.enQueue({ chunk, encoding, cb })
} else {
this.writing = true
this._write(chunk, encoding, () => {
cb()
this._clearBuffer()
})
}
return flag
}
private _clearBuffer() {
let { data } = this.queue.deQueue() || {}
if (data) {
this._write(data.chunk, data.encoding, () => {
data.cb && data.cb()
this._clearBuffer()
})
} else {
if (this.needDrain) {
this.needDrain = false
this.emit('drain')
}
}
}
_write(chunk, encoding, cb) {
if (typeof this.fd !== 'number') {
return this.once('open', () => this._write(chunk, encoding, cb))
}
fs.write(this.fd, chunk, this.start, chunk.length, this.writeOffset, (err, written) => {
if (err) {
this.emit('error', err)
}
this.writeOffset += written
this.writeLen -= written
cb && cb()
})
}
}
export default WriteStream
通信基本原理
-
网络协议
- http 协议
- tcp/ip 协议
- arp 协议
- ftp 协议
-
通信的条件
- 主机之间需要有介质:蓝牙,wifi,有线网络等
- 主机必须有网卡设备:调制和解调,将二进制信息转换为高低电压的过程,就是调制的过程
- 主机之间需要协商通信协议和网络速率
-
局域网通信
- 交换机:局域网内,一台主机要和另一台主机通信,需要经过交换机转发
- 通过mac地址找到对应的设备,然后通过arp协议广播,告诉局域网内所有主机我的ip地址和mac地址的对应关系
- 局域网之间通信,需要通过路由器转发
-
网络通信模型
-
OSI 七层模型
- 应用层: 用户与网络的接口
- 表示层: 数据加密、转换、压缩
- 会话层: 控制网络连接建立与终止
- 传输层: 控制数据传输可靠性
- 网络层: 确定目标网络
- 数据链路层: 确定目标主机
- 物理层: 各种物理设备和标准
-
Tcp/Ip 五层模型
- 封装的过程
- 物理层 《- 数据链路层 《- 网络层 《- 传输层(Tcp) 《- 应用层(http)
- 0110110 《- 目标Mac 源Mac 《- 目标IP 源IP 《- 目标端口 源端口 《- data
- 封装和解封的过程是相反的
-
TCP 协议
- TCP 属于传输层协议
- TCP 是面向连接的协议
- TCP 用于处理实时通信
- 常见控制字段
- SYN=1表示请求建立连接
- FIN=1表示请求断开连接
- ACK=1表示数据信息确认
-
TCP握手三次, 为啥挥手要四次
- 握手中间那次是,服务端确认链接和请求链接合并为一次,而四次挥手则是分开的。
- 挥手是客户端与服务端断开连接的过程,中断前,服务器可能还要数据没发完,所以需要等数据发送完毕后,再继续后两次挥手
创建tcp通信
// server
import net from 'node:net'
const server = net.createServer()
server.listen(1234, 'localhost')
server.on('listening', () => {
console.log('服务器启动成功')
})
server.on('connection', (socket) => {
socket.on('data', (chunk) => {
console.log(chunk.toString());
socket.write('hello!' + chunk.toString())
})
})
// client
import net from 'node:net'
const client = net.createConnection({
port: 1234,
host: 'localhost'
})
client.on('connect', () => {
client.write('attacki')
// buffer缓冲区,可能会导致本想分开发送的数据,粘连在一起发送
// client.write('1')
// client.write('2')
// client.write('3')
// 服务端:123
for (let i = 0; i < 3; i++) {
// 利用宏任务队列来解决粘连问题
// 注意:如果是定时器相同的宏任务,仍可能粘连在一起发送
setTimeout(() => client.write(val + ''), (i + 1) * 1000)
}
})
client.on('data', (data) => {
console.log(data.toString())
})
基于封包拆包解决粘包问题
class MyTransform {
constructor(options) {
this.packageHeaderLen = 4
this.serialLen = 2
this.serialNum = 0
}
encode(buffer, serialNum) {
const body = Buffer.from(buffer)
const buf = Buffer.alloc(this.packageHeaderLen)
buf.writeInt16BE(serialNum || this.serialNum, 0)
buf.writeInt16BE(body.length, this.serialLen)
if (serialNum == undefined) this.serialNum++
return Buffer.concat([buf, body])
}
decode(buffer) {
const header = buffer.slice(0, this.packageHeaderLen)
const body = buffer.slice(this.packageHeaderLen)
return {
serialNum: header.readInt16BE(),
bodyLength: header.readInt16BE(this.serialLen),
body: body.toString()
}
}
getPackageLength(chunk) {
// 包长度小于
if (chunk.length < this.packageHeaderLen) return 0
return chunk.readInt16BE(this.serialLen) + this.packageHeaderLen
}
}
// server
import net from 'node:net'
import Transform from './transform.js'
const server = net.createServer()
server.listen(1234, 'localhost')
server.on('listening', () => console.log('服务器启动成功'))
const ts = new Transform()
let overRage = null
server.on('connection', (socket) => {
socket.on('data', (chunk) => {
if (overRage) {
chunk = Buffer.concat([overRage, chunk])
}
let length = 0
while (length = ts.getPackageLength(chunk)) {
const packContent = chunk.slice(0, length)
chunk = chunk.slice(length)
const data = ts.decode(packContent)
const serialNum = data.serialNum
const content = 'hello! ' + data.body
socket.write(ts.encode(content, serialNum))
}
overRage = chunk
})
})
server.on('close', () => {})
server.on('error', () => {})
// client
import net from 'node:net'
import Transform from './transform.js'
const client = net.createConnection({
port: 1234,
host: 'localhost'
})
const ts = new Transform()
let overRage = null
client.on('connect', () => {
client.write(ts.encode('attacki'))
client.write(ts.encode('david'))
client.write(ts.encode('arron'))
client.write(ts.encode('grayson'))
// for (let i = 0; i < 3; i++) {
// (function (val) {
//
// setTimeout(() => client.write(val + ''), (i + 1) * 1000)
// })(i)
// }
})
client.on('data', (chunk) => {
if (overRage) {
chunk = Buffer.concat([overRage, chunk])
}
let length
while (length = ts.getPackageLength(chunk)) {
const packContent = chunk.slice(0, length)
const data = ts.decode(packContent)
chunk = chunk.slice(length)
console.log(data)
}
overRage = chunk
})
client.on('close', () => {})
client.on('error', () => {})
http协议
- 缓存
- 编码
- 断点续传
- 防盗链