文章目录
- nodejs架构
 - 异步非阻塞IO 事件驱动 单线程
- global 对象
 - path
 - buffer
 - fs
 - fs处理文件打开和关闭
 - fs使用buffer进行文件操作
 - fs大文件传输和写入
 - fs操作目录
 - fs创建目录、删除目录
 - 组件化前端开发
 - commonjs
 - module
 - module.exports 和 exports 区别
 - 模块加载流程
 - vm模块
 - 模块加载函数实现
 - 事件模块
 - EventEmitter实现
 - 浏览器事件环执行顺序
 - Nodejs事件环
 - Stream模块
 - 可读流
 - 可写流
 - 理解writeable的缓冲机制
 - 理解drain事件的作用
 - 模拟可读流及pipe的实现
 - 单向链表的实现,模拟可写流实现
 - 通信基本原理
 - 创建tcp通信
 - 基于封包拆包解决粘包问题
 - http协议
 
 
nodejs架构
nodejs api的测试,是基于v16.20.2版本
异步非阻塞IO 事件驱动 单线程
- nodejs主线程是单线程
 - 网络io/libuv适应当前平台
 - 非网络io、非io线程池中的调度操作
 
global 对象
- js文件中 this默认为undefined
 - __dirname
 - __filename
 - module
 - exports
 - require
 - performance
 
path
import path from 'node:path';
import {
    dirname
} from "node:path"
import {
    fileURLToPath
} from "node:url"
const __filename = fileURLToPath(
    import.meta.url);
const __dirname = dirname(__filename);
// 获取路径中最后名称, 第二个参数用于判断是否匹配扩展名,匹配就返回名字,不匹配返回完整名称
console.log(path.basename(__filename)); // test.js
console.log(path.basename(__filename, 'js')); // test
console.log(path.basename(__filename, 'css')); // test.js
console.log(path.basename('/a/b/c')); // c
console.log(path.basename('/a/b/c/')); // c
// 获取路径中目录
console.log(path.dirname(__filename)); // /Volumes/D2/project/template/iwantit/test
console.log(path.dirname('/a/b/c/')); // /a/b
// 获取路径中扩展名称
console.log(path.extname(__filename));
// 获取路径是否是绝对路径
console.log(path.isAbsolute(__filename));
// 拼接多个路径
console.log(path.join('/a', 'b')); // /a/b
// 返回绝对路径
//  resolve([from], to)
console.log(path.resolve('./ds/d.html')); // /Volumes/D2/project/template/iwantit/test/ds/d.html
// 解析路径
console.log(path.parse(__filename));
// {
//   root: '/',
//   dir: '/Volumes/D2/project/template/iwantit/test',
//   base: 'test.js',
//   ext: '.js',
//   name: 'test'
// }
// 序列化路径
console.log(path.format({
    a: __filename,
    b: "d"
}));
// 规范化路径 处理平台差异
console.log(path.normalize(__filename));
buffer
- IO行为操作的就是二进制数据
 - buffer实现nodejs平台下的二进制数据操作
 - stream操作配合管道实现数据分段,处理大型文件
 - 数据的端到端传输会产生生产者和消费者
- 生产速度和消费速度不一致,往往存在等待
 - buffer就是处理等待的数据的,属于缓冲区
 - buffer无需require
 - buffer不占据v8堆内存大小的使用空间
 - buffer内存使用是node控制,回收 是v8负责
 - buffer一般配合stream流使用,充当数据缓冲区
 
 
nodejs打印buffer返回的是16进制格式,16进制可用4位二进制表示,两个16进制共占用8位,也就是一个字节
// fill 依据传入的数据重复进行填写
let b1 = Buffer.alloc(6);
b1.fill(123, 1, 5); // 从1写到5
console.log(b1); // <Buffer 00 7b 7b 7b 7b 00>  
console.log(b1.toString()); // {{{{
// write
let b1 = Buffer.alloc(6);
b1.write("123", 3, 2) // 从3开始写,写入长度为2
console.log(b1); // <Buffer 00 00 00 31 32 00>
console.log(b1.toString()); // 12
// toString 第二个参数表示从哪里开始读 第三个参数表示从哪里结束
let b1 = Buffer.from("乐观向上");
console.log(b1);
console.log(b1.toString('utf-8', 3)); // 观向上
// slice
let b1 = Buffer.from("乐观向上");
let b2 = b1.slice(3, 9);
console.log(b2);
console.log(b2.toString()); // 观向
// indexOf
let b1 = Buffer.from("乐观向上");
console.log(b1);
console.log(b1.indexOf('向')); // 6
// copy
let b1 = Buffer.alloc(6);
let b2 = Buffer.from('晴天');
b2.copy(b1, 3, 3, 6); // 写入对象, 从数据源哪个位置开始, 写入起始位置,写入结束位置
console.log(b1.toString()); // 天
console.log(b2.toString()); // 晴天
b2.copy(b1);
console.log(b1.toString()); // 晴天
// concat
let b1 = Buffer.from('很远很远的');
let b2 = Buffer.from('距离');
let a = Buffer.concat([b1, b2], 9);
console.log(a.toString()) // 很远很
let b = Buffer.concat([b1, b2]);
console.log(b.toString()) // 很远很远的距离
let bol = Buffer.isBuffer(b); // true
// split
Buffer.prototype.split = function(sep) {
    let len = Buffer.from(sep).length;
    let res = [];
    let start = 0;
    let offset = 0;
    while ((offset = this.indexOf(sep, start)) !== -1) {
        res.push(this.slice(start, offset))
        start = offset + len;
    }
    res.push(this.slice(start))
    return res;
}
let b1 = Buffer.from("乐观向上");
console.log(b1.split("观").map(v => v.toString()));
fs
- 
fs基本操作类
- 权限位
 - 用户对于文件所具备的操作权限
 - 读 r 4
 - 写 w 2
 - 执行 x 1
 - 无权限 0
 
 - 
drwxr-xr-x 第一位为d代表是文件夹
 - 
-rw-r—r— 第一位为-代表是文件, 每3位为一组
 - 
标识符
- r 表示可读
 - w 表示可写
 - s 表示同步
 - - 表示执行相反的操作
 - r+ 以读写模式打开文件。如果文件不存在,则会抛出错误。
 - x 表示排它操作
 - a 表示追加操作
 
 - 
文件描述符 fd
- 0、1、2 标准输入、标准输出、标准错误
 - fs.open调用后会有个fd,从3开始
 
 - 
fs常用api
- readFile 从指定文件中读取数据
 - writeFile 从指定文件中写入数据
 - appendFile 追加的方式向指定文件中写入数据
 - copyFile 将某个文件中的数据拷贝至另一个文件
 - watchFile 对指定文件进行监控
 
 
fs.readFile(path.resolve('txt.txt'), 'utf-8', (err, data) => {
    if (!err) {
        console.log(data);
        fs.writeFile(path.resolve('txt.txt'),
            'ok', {
                mode: 438, // 八进制0o666 6*64+6*8+6 = 438  表示可读写,但不可执行
                flag: 'r+' // 用来控制如何读写,r+代表不清空写入
            },
            (err) => {
                console.log(err);
            })
    } else {
        console.log(err);
    }
})
// 追加内容
fs.appendFile('txt.txt', '乐观向上', (err) => {
    console.log('写入成功');
})
fs.copyFile('txt.txt', 'copy-txt.txt', (err) => {
    console.log('复制成功');
})
import path from 'node:path';
import fs from 'node:fs';
import browserSync from 'browser-sync';
import {
    dirname
} from "node:path"
import {
    fileURLToPath
} from "node:url"
import {
    marked
} from 'marked';
const __filename = fileURLToPath(
    import.meta.url);
const __dirname = dirname(__filename);
const template = `
<!DOCTYPE html>
<html lang="en">
<head>
 <meta charset="UTF-8">
 <meta http-equiv="X-UA-Compatible" content="IE=edge">
 <meta name="viewport" content="width=device-width, initial-scale=1.0">
 <title>Document</title>
 <style>{{style}}</style>
</head>
<body>
 {{content}}
</body>
</html>
`
let mdPath = path.resolve(__dirname, './index.md');
let cssPath = path.resolve(__dirname, './index.css');
let basename = path.basename(mdPath);
let extname = path.extname(basename);
let htmlPath = path.resolve(__dirname, basename.replace(extname, '.html'));
fs.watchFile(mdPath, (curr, prev) => {
    if (curr.mtime !== prev.mtime) {
        fs.readFile(mdPath, (err, data) => {
            if (!err) {
                const mdContent = marked.parse(data.toString());
                let contentTemp = template.replace('{{content}}', mdContent);
                fs.readFile(cssPath, (err, data) => {
                    if (!err) {
                        const cssStyle = data.toString();
                        contentTemp = contentTemp.replace('{{style}}', cssStyle);
                    };
                    fs.writeFile(htmlPath, Buffer.from(contentTemp), (err) => {
                        if (!err) {
                            console.log('文件生成成功');
                        }
                    })
                })
            }
        })
    }
})
browserSync.init({
    browser: '',
    server: __dirname,
    open: true,
    watch: true
})
fs处理文件打开和关闭
文件描述符的使用,在同一个进程中,文件关闭的时候,文件描述符会复用。
fs.open('./index.css', 'r', (err, fd) => {
    console.log('css', fd);
})
fs.open('./index.html', 'r', (err, fd) => {
    console.log('html', fd);
    fs.close(fd, (err) => {
        console.log('html 关闭成功');
        fs.open('./index.js', 'r', (err, fd) => {
            console.log('js', fd);
        })
    })
})
// css 20
// html 21
// html 关闭成功
// js 21
fs使用buffer进行文件操作
为啥要自己在buffer中进行读写?目的就是为了控制读取和写入速度,如果全交给nodejs来做,速度无法控制,可能会导致数据出现问题
let buf = Buffer.alloc(10);
/**
 * 以下参数使用于 read和write方法
 * 
 * fd 定位当前被打开的文件
 * buf 用于表示当前缓冲区
 * offset 表示当前从 buf 的哪个位置开始执行写入
 * length 表示当前次写入的长度
 * position 表示当前从文件的哪个位置开始读取
 */
// read 读操作就是讲数据从磁盘文件写入到buffer中
fs.open('data.txt', 'r', (err, rfd) => {
    fs.read(rfd, buf, 1, 4, 3, (err, readBytes, data) => {
        console.log(readBytes);
        console.log(data.toString());
    })
})
// write 将缓冲区的数据写入到磁盘文件当中 
buf = Buffer.from("1234567890")
fs.open('back.txt', 'r+', (err, wfd) => {
    fs.write(wfd, buf, 0, 3, 2, (err, writtenBytes, buffer) => {
        console.log(writtenBytes);
        console.log(buffer.toString());
        fs.close(wfd)
    })
})
fs大文件传输和写入
/**
 * 1. 打开a文件,用read写入buffer
 * 2. 打开b文件,用write将buffer写入b中
 */
let bufferSize = 10;
let buf = Buffer.alloc(bufferSize);
let offset = 0;
fs.open('a.txt', 'r', (err, rfd) => {
    fs.open('b.txt', 'a+', (err, wfd) => {
        function next() {
            fs.read(rfd, buf, 0, bufferSize, offset, (err, readBytes, data) => {
                if (!readBytes) {
                    fs.close(rfd);
                    fs.close(wfd);
                    return
                }
                fs.write(wfd, buf, 0, readBytes, offset, (err, writtenBytes) => {
                    console.log('写入成功');
                    offset += readBytes;
                    next();
                })
            })
        }
        next();
    })
})
fs操作目录
fs.access('data.txt', (err) => !err && (console.log('有权限')))
fs.stat('data.txt', (err, statObj) => {
    console.log(statObj.size);
    console.log(statObj.isFile());
    console.log(statObj.isDirectory());
})
// 默认非递归创建
fs.mkdir('a/b/c', {
    recursive: true
}, (err) => {
    if (err) {
        console.log('创建失败');
    } else {
        console.log('创建成功');
    }
})
// 默认只能删除空目录 递归为true可以删除非空目录
fs.rmdir('a/b/c', {
    recursive: true
}, (err) => {
    if (err) {
        console.log('删除失败');
    } else {
        console.log('删除成功');
    }
})
// 读取目录
fs.readdir('node_modules/.bin', (err, files) => {
    console.log(files);
})
// unlink
fs.unlink('a.txt', (err) => {
    console.log(err);
    if (err) {
        console.log('删除失败');
    } else {
        console.log('删除成功');
    }
})
fs创建目录、删除目录
/**
 * 1. 将来调用时需要接受类似 a/b/c,路径以 / 连接
 * 2. 利用 / 进行分割,然后生成数组 ['a', 'b', 'c']
 * 3. 对上述数组进行便利,我们需要拿到每一项,然后与前一项进行拼接
 * 4. 判断拼接的路径是否可操作,有则存在,否则需创建
 */
function makeDirSync(dirPath) {
    let items = dirPath.split(path.sep)
    for (let i = 1; i <= items.length; i++) {
        let dir = items.slice(0, i).join(path.sep)
        try {
            fs.accessSync(dir)
        } catch (error) {
            fs.mkdirSync(dir)
        }
    }
}
makeDirSync('a/b/c')
// 删除目录
async function mkDirAsync(dirPath, cb) {
    const items = dirPath.split(path.sep);
    for (let i = 1; i <= items.length; i++) {
        const dir = items.slice(0, i).join(path.sep);
        try {
            await access(dir);
        } catch (error) {
            await mkdir(dir);
        }
    }
    cb && cb()
}
mkDirAsync('a/b/c')
function rmDirAsync(dirPath, cb) {
    // 判断path类型
    fs.stat(dirPath, (err, statObj) => {
        if (statObj.isDirectory()) {
            fs.readdir(dirPath, (err, files) => {
                console.log(files);
                const dirs = files.map(item => {
                    return path.join(dirPath, item)
                })
                let i = 0;
                function next() {
                    if (i >= dirs.length) return fs.rmdir(dirPath, cb);
                    rmDirAsync(dirs[i++], next)
                }
                next();
            })
        } else {
            fs.unlink(dirPath, cb)
        }
    })
}
rmDirAsync('a/b', () => {
    console.log('删除成功了');
})
组件化前端开发
- 命名冲突和污染
 - 代码冗余,无效请求多
 - 文件依赖关系复杂
 
commonjs
- 任意js文件就是一个模块,具有独立作用于
 - 使用require导入其他模块
 - 将模块ID传入require实现目标模块定位
 - 模块加载都是同步的
 
module
- 任意js文件就是一个模块 可使用module属性
 - id 模块标识符,一般是一个绝对路径
 - filename 返回文件模块的绝对路径
 - loaded 返回布尔值 表示模块是否完成架子啊
 - parent 返回对象存放调用当前模块的模块
 - children 返回数组,存放当前模块调用的其他模块
 - exports 返回当前模块需要暴露的内容
 - paths 返回数组 存放不同目录下的node_modules位置
 
module.exports 和 exports 区别
- 两者在引用上默认是一致的
 - exports只可追加属性,不可直接赋值
 - require 读入并执行一个模块文件,返回exports对象
 - resolve 返回模块文件绝对路径
 - extensions 依据后缀名执行解析操作
 - main 返回主入口对象,脚本执行的那个文件即为主对象
 
模块加载流程
- 
核心模块:node源码编译时写入到二进制文件中
 - 
文件模块:代码运行时,动态加载
 - 
加载流程
- 
路径分析: 依据标识符确定模块位置
- 路径标识符
 - 非路径标识符常见于核心模块
 - 模块路径是根据当前目录开始向上查找每个node_modules,直到根目录
 
 - 
文件定位: 确定目标模块中具体的文件及文件类型
- require处理没有扩展名的路径,会默认按 js、json、node的顺序,先查找是否又对应的模块文件
- 如果以上还没找到,就会把路径当成一个npm包,然后查找package.json,使用JSON.parse()解析,取出main属性值
- main属性也没扩展名,也会默认按 js、json、node的顺序继续查找
- 如果以上还没找到,将index作为目标模块中的具体文件名称
 
 
 - main属性也没扩展名,也会默认按 js、json、node的顺序继续查找
 
 - 如果以上还没找到,就会把路径当成一个npm包,然后查找package.json,使用JSON.parse()解析,取出main属性值
 
 - require处理没有扩展名的路径,会默认按 js、json、node的顺序,先查找是否又对应的模块文件
 - 
编译执行:采用对应的方式完成文件的编译执行
- 
将某个具体类型的文件按照相应的方式进行编译和执行
 - 
创建新对象,按路径载入,完成编译执行
 - 
js文件执行
- 使用fs同步读入目标文件内容
 - 对内容进行语法包装,生成可执行的js函数
 - 调用函数时传入exports、module、require等属性
 
 - 
json文件是直接使用JSON.parse进行解析
 - 
缓存优先原则
 
 - 
 
 - 
 
vm模块
1. runInThisContext
// test.txt
var ttt = 'test vm'
// test.js
import vm from 'node:vm';
import fs from 'node:fs';
let content = fs.readFileSync('./test.txt', 'utf-8');
vm.runInThisContext(content);
console.log(ttt);
console.log(global.ttt); // test.txt的内容运行后,ttt被设置在全局对象上了
// test vm  
// test vm  
let localVar = 'initial value';
const vmResult = vm.runInThisContext('localVar = "vm";');
console.log(`vmResult: '${vmResult}', localVar: '${localVar}'`);
// vmResult: 'vm', localVar: 'initial value'
2. runInContext
const vm = require('node:vm');
const contextObject = {
    globalVar: 1
};
vm.createContext(contextObject);
for (let i = 0; i < 10; ++i) {
    vm.runInContext('globalVar *= 2;', contextObject);
}
console.log(contextObject);
// Prints: { globalVar: 1024 }
模块加载函数实现
// 路径分析
// 缓存优化
// 文件定位
// 编译执行
事件模块
- 
nodejs是基于事件驱动的异步操作架构
 - 
内置events模块提供 EventEmitter 类
 - 
nodejs内置模块多数内置集成 EventEmitter
 - 
EventEmitter.on 相同事件,按注册顺序依次执行
 - 
EventEmitter.emit
 - 
EventEmitter.once
 - 
EventEmitter.off
 
EventEmitter实现
// on
// emit
// once
// off
浏览器事件环执行顺序
- 从上至下执行所有的同步代码
 - 执行过程中将遇到的宏任务和微任务添加在相应的队列中
 - 同步代码执行完毕后,执行满足条件的为任务回调
 
Nodejs事件环
- 
宏任务队列顺序
- check: 执行setImmediate中的回调
 - timers: 执行setTimout与setInterval回调
 - pending callbacks: 执行系统操作的回调,例如 tcp和udp
 - idle.prepare: 只在系统内部进行使用
 - poll: 执行I/O相关的调用
 - close callbacks: 执行close事件的回调
 
 - 
流程
- 执行同步代码,将不同的任务添加至相应的队列
 - 所有同步执行后会执行满足条件的微任务
 - 所有微任务执行后会执行check队列中满足的宏任务
 - check中的宏任务执行完成后就会依次切换队列
 - 注意:在每个宏任务完成后都会先清空微任务队列,如果微任务耗时较长,会阻塞队列
 
 
Stream模块
- 
nodejs中的流就是处理流式数据的抽象接口
 - 
ls | grep *.js
 - 
流解决的常见问题
- 同步读取资源文件,用户需等待数据读取完成
 - 资源文件一次性加载至内存,开销较大
 
 - 
流处理数据的优势
- 时间效率:流的分段处理可以同时操作多个数据chunk
 - 空间效率:同一时间流无须占据大内存
 - 使用方便:流配合管理,程序扩展变得更简单
 
 
Nodejs的流分类
- Readable: 可读流
 - Writeable:可写流
 
Node.js 的可读流(Readable Stream)内部通过复杂的状态管理和缓冲区机制实现了高效的数据处理。以下是其核心原理和关键问题的解析: 一、可读流内部数据管理机制
- 
缓冲区的创建与数据插入
• 可读流通过_read方法从底层数据源(如文件、网络)读取数据,并调用push方法将数据块插入内部缓冲区。 • 缓冲区使用链表结构(如BufferList)实现,支持高效的数据插入和移除操作。链表每个节点存储一个数据块(Buffer 或字符串),通过highWaterMark参数控制缓冲区最大容量。 • 当调用push时,数据会被追加到链表尾部,并更新length属性记录当前缓冲区总字节数。若缓冲区未满(未达highWaterMark),流会继续触发_read以加载更多数据。 - 
数据读取的触发条件
• 流动模式(Flowing Mode):通过监听data事件或调用pipe方法,流会自动从缓冲区中拉取数据并触发事件。此时数据会被连续消费,直到缓冲区为空或达到highWaterMark限制。 • 暂停模式(Paused Mode):通过监听readable事件,需手动调用read()方法逐块读取数据。每次读取后,流会根据剩余缓冲区空间决定是否触发新的_read调用。 
二、 readable 与 data 事件的切换原理
- 
模式切换的触发方式
• 切换到流动模式:◦ 添加
data事件监听器; ◦ 调用resume()方法; ◦ 使用pipe方法连接可写流。 
• 切换回暂停模式:
◦ 移除所有 `data` 事件监听器并调用 `pause()` ;
◦ 调用 `unpipe()` 断开管道。
2. 底层状态管理
• 可读流通过内部状态对象 ReadableState 跟踪模式变化,关键字段包括:
◦ `flowing` :标识当前模式( `null` 初始态, `true` 流动模式, `false` 暂停模式);
◦ `needReadable` :标记是否需要触发 `readable` 事件;
◦ `reading` :标识是否正在从底层读取数据。
• 示例流程:当 data 事件触发时,流会进入流动模式,循环调用 read() 清空缓冲区;若缓冲区数据量低于 highWaterMark ,则触发 _read 填充新数据。
三、关键设计细节
- 
背压控制(Backpressure)
• 当可写流处理速度慢于可读流时,缓冲区达到highWaterMark,流会暂停_read调用,防止内存溢出。 • 通过writable.write()返回的布尔值判断下游处理能力,动态调整数据生产节奏。 - 
事件循环与异步处理
• 数据读取和事件触发通过 Node.js 事件循环异步调度。例如,在流动模式中,data事件可能批量触发以减少上下文切换开销。 
- Duplex:双工流,可读写
 - Transform:转换流,可读写,可数据转换
 
import {
    Duplex,
    Transform,
} from 'node:stream'
import fs from 'fs'
class MyTransform extends Transform {
    constructor() {
        super()
    }
    _transform(chunk, encoding, next) {
        this.push(chunk.toString().toUpperCase())
        next()
    }
}
const myTransform = new MyTransform()
myTransform.write("d")
myTransform.write("e")
myTransform.write("f")
myTransform.on('data', (chunk) => console.log(`Received chunk: ${chunk}`))
// Received chunk: D
// Received chunk: E
// Received chunk: F
可读流
let rs = fs.createReadStream('./test.txt', {
    flags: 'r',
    encoding: null,
    fd: null,
    mode: 0o666,
    autoClose: true,
    start: 0,
    // end: 13,
    highWaterMark: 5, // 控制底层数据分片大小(每次读取3字节)
})
rs.on('data', (chunk) => {
    console.log(chunk.toString())
    rs.pause()
    setTimeout(() => {
        rs.resume()
    }, 1000)
})
rs.on('readable', () => {
    console.log('触发readable事件,当前缓冲区长度:', rs._readableState.length);
    let chunk
    while ((chunk = rs.read(3)) !== null) {
        console.log('读取到数据:', chunk.toString());
        console.log('读取后缓冲区长度:', rs._readableState.length);
    }
    // 触发readable事件,当前缓冲区长度: 5
    // 读取到数据: 012
    // 读取后缓冲区长度: 2
    // 触发readable事件,当前缓冲区长度: 7
    // 读取到数据: 345
    // 读取后缓冲区长度: 4
    // 读取到数据: 678
    // 读取后缓冲区长度: 1
    // 触发readable事件,当前缓冲区长度: 1
    // 读取到数据: 9
    // 读取后缓冲区长度: 0
})
rs.on('open', () => {
    console.log('文件打开成功')
})
rs.on('close', () => {
    // 可读流只要读了数据,就会触发close事件
    console.log('文件关闭')
})
rs.on('end', () => {
    // 缓冲区中没有数据时,才会触发end事件
    // 适合在这里来整合数据,比如写入到数据库等操作
    console.log('读取结束')
})
rs.on('error', (err) => {
    console.log('读取错误', err)
})
可写流
import fs from 'fs'
const ws = fs.createWriteStream('output.txt', {
    flags: 'w',
    fd: null,
    autoClose: true,
    mode: 0o666,
    encoding: 'utf8',
    start: 0,
    highWaterMark: 4
})
ws.write('hello ', () => {
    console.log('写入完成1')
})
ws.write('world', () => {
    console.log('写入完成2')
})
ws.on('open', () => {
    console.log('文件打开了')
})
ws.on('error', () => {
    console.log('xxxxxx错误发生')
})
// Calling the writable.end() method signals that no more data will be written to the Writable
ws.end('!')
// 文件关闭事件,需要触发end事件才会触发close事件
ws.on('close', () => {
    console.log('文件关闭了')
})
理解writeable的缓冲机制
import fs from 'node:fs'
const ws = fs.createWriteStream('./output.txt', {
    highWaterMark: 4
})
async function write(chunk) {
    // write执行流程
    let flag = ws.write('1')
    console.log(flag) // true
    flag = ws.write('2')
    console.log(flag) // true
    // flag为false,不代表写入失败,而是表示缓冲区已满,需要等待drain事件触发后再继续写入
    flag = ws.write('3')
    console.log(flag) // false
    // 以上同步代码的写入操作,会将数据放入缓冲区,实际写入操作回调,被放入当前宏任务的微任务队列中
    const p1 = new Promise((resolve) => {
        setTimeout(() => {
            // 2s后写入数据4到文件中
            ws.write('4')
            resolve()
        }, 2000)
    })
    // 4的写入是另一个包含宏任务的微任务
    await p1
    // 但对于另一个宏任务中的写入操作,会在3秒后第二次写入数据5到文件中
    setTimeout(() => {
        // 因为await了2s,所有是4s后写入数据5到文件中
        ws.write('5')
    }, 2000)
    // 监听drain事件,当缓冲区再次可用时,触发该事件
    ws.on('drain', () => {
        console.log('drain')
    })
}
write()
// [email protected]版本
// 1. 创建可写流,设置高水位线为4,流对象初始化缓冲区及参数,注册流销毁及错误事件回调,保证
// 2. 每次调用write方法,将数据写入到缓冲区
// 3. 生产速度一般远大于消费速度,缓冲区会逐渐被填满
// 4. 第三次调用write方法,flag返回false,缓冲区已到达警戒线,需要手动限制继续写入数据到缓冲区,强行写入是被允许的,但是不推荐这么做
// 5. 生产者监听drain事件,当缓冲区再次可用时,触发drain事件,继续写入数据
// 6. 每完成一次宏任务,会执行一次写入操作
理解drain事件的作用
import fs from 'node:fs'
import {
    truncate
} from 'node:fs/promises'
const ws = fs.createWriteStream('./output.txt', {
    highWaterMark: 3
})
const data = '我很努力'
// ws.write(data) // 一次性写入,可能会有性能问题
let flag = true
let index = 0
function write() {
    flag = true
    while (flag && index < data.length) {
        flag = ws.write(data[index])
        index++
    }
}
write()
ws.on('drain', () => {
    console.log('drain triggered'); // 触发4次
    flag = true
    write()
})
模拟可读流及pipe的实现
import fs from 'node:fs'
import EventEmitter from 'node:events'
class MyReadStream extends EventEmitter {
    constructor(options) {
        super(options)
        this.path = options.path
        this.flag = options.flag || 'r'
        this.mode = options.mode || 0o666
        this.autoClose = options.autoClose || true
        this.start = options.start || 0
        this.end = options.end || null
        this.highWaterMark = options.highWaterMark || 64 * 1024
        this.fd = null
        this.readOffset = 0
        this.open()
        this.on('newListener', (event, listener) => {
            if (event === 'data') {
                this.read()
            }
        })
        this.drained = false
    }
    open() {
        fs.open(this.path, this.flag, this.mode, (err, fd) => {
            if (err) {
                this.emit('error', err)
            } else {
                this.fd = fd
                this.emit('open', this.fd)
            }
        })
    }
    read() {
        if (this.drained) return
        if (this.fd === null) return this.once('open', () => this.read())
        let buf = Buffer.alloc(this.highWaterMark)
        let howMuchToRead = this.end ? Math.min(this.end - this.start + 1, this.highWaterMark) : this.highWaterMark
        fs.read(this.fd, buf, this.start, howMuchToRead, this.readOffset, (err, bytesRead) => {
            if (err) {
                this.emit('error', err)
            } else if (bytesRead === 0) {
                this.emit('end')
            } else {
                let data = buf.slice(0, bytesRead)
                this.readOffset += bytesRead
                this.emit('data', data)
                process.nextTick(() => this.read())
            }
        })
    }
    close() {
        if (this.fd === null) return this.once('open', () => this.close())
        fs.close(this.fd, err => {
            if (err) {
                this.emit('error', err)
            } else {
                this.fd = null
                this.emit('close')
            }
        })
    }
    pipe(ws) {
        this.on('data', (chunk) => {
            const ret = ws.write(chunk)
            console.log(ret);
            if (!ret) this.pause()
        })
        ws.on('drain', () => {
            this.resume()
        })
    }
    pause() {
        this.drained = true
    }
    resume() {
        this.drained = false;
        this.read()
    }
}
const rs = new MyReadStream({
    path: './output.txt',
    highWaterMark: 3
})
const ws = fs.createWriteStream('./output2.txt', {
    highWaterMark: 1
})
rs.on('data', (chunk) => {
    console.log(chunk.toString())
})
rs.on('end', () => {
    console.log('文件读取完毕')
})
rs.pipe(ws)
单向链表的实现,模拟可写流实现
// linkList.ts
class LinkNode<T> {
  data: T
  next: LinkNode<T> | null = null
  constructor(data: T, next?: LinkNode<T> | null) {
    this.data = data
    this.next = next || null
  }
}
class LinkList<T> {
  head: LinkNode<T> | null = null
  size: number = 0
  constructor() { }
  validateIndex(index: number, checkRange: boolean = true) {
    if (index < 0) {
      throw new Error('Index must be greater than or equal to 0');
    }
    if (checkRange && index >= this.size) {
      throw new Error('Index must be less than the size of the list');
    }
    return true;
  }
  private _get_node(index: number): LinkNode<T> {
    let curNode = this.head!
    for (let curIndex = 0; curIndex < index; curIndex++) {
      curNode = curNode.next!
    }
    return curNode
  }
  add(content: T)
  add(index: number, content: T)
  add(indexOrContent: number | T, content?: T) {
    if (typeof indexOrContent !== 'number') {
      return this.add(0, indexOrContent);
    }
    const index = indexOrContent;
    this.validateIndex(index, false);
    let target: LinkNode<T> | null = null
    if (index === 0) {
      target = new LinkNode<T>(content!, this.head)
      this.head = target
    } else {
      const prev = this._get_node(index - 1)
      target = new LinkNode<T>(content!, prev.next)
      prev.next = target
    }
    this.size++
    return target
  }
  remove(index: number): LinkNode<T> | null {
    this.validateIndex(index)
    let rmTarget: LinkNode<T> | null = null
    if (index === 0) {
      rmTarget = this.head
      this.head = this.head!.next
    } else {
      const prev = this._get_node(index - 1)
      rmTarget = prev!.next
      if (rmTarget === null) return null
      prev!.next = rmTarget!.next
    }
    this.size--
    return rmTarget
  }
  set(index: number, content: T) {
    this.validateIndex(index)
    let target: LinkNode<T> = this._get_node(index)
    target.data = content
    return target
  }
  get(index: number): LinkNode<T> | null {
    this.validateIndex(index)
    return this._get_node(index)
  }
  clear() {
    this.head = null
    this.size = 0;
  }
}
export default LinkList
// queue.ts
import LinkList from "./linkList";
class Queue<T> {
  queue: LinkList<T>;
  constructor() {
    this.queue = new LinkList()
  }
  enQueue(item: T) {
    this.queue.add(item)
  }
  deQueue() {
    if(this.isEmpty()) return null
    return this.queue.remove(0)
  }
  isEmpty() {
    return this.queue.size === 0
  }
  clear(){
    this.queue.clear()
  }
}
export default Queue
// writeStream.ts
import fs from 'fs'
import EventsEmitter from 'events'
import Queue from './queue'
interface WriteBuffer {
  chunk: ArrayBuffer
  encoding: string
  cb: Function
}
class WriteStream extends EventsEmitter {
  flags: string
  mode: number
  fd: number | null
  autoClose: any
  hightWaterMark: any
  start: any
  encoding: any
  queue: Queue<WriteBuffer>
  writing: boolean
  writeLen: number
  needDrain: boolean
  writeOffset: number
  constructor(private path, options: any) {
    super()
    this.path = path
    this.flags = options.flags || 'w'
    this.mode = options.mode || 0o666
    this.autoClose = options.autoClose || true
    this.hightWaterMark = options.highWaterMark || 16 * 1024
    this.start = options.start || 0
    this.encoding = options.encoding || 'utf8'
    this.open()
    this.queue = new Queue()
    this.writing = false
    this.writeLen = 0
    this.writeOffset = 0
    this.needDrain = false
  }
  open() {
    fs.open(this.path, this.flags, (err, fd) => {
      if (err) {
        this.emit('error', err)
      }
      this.fd = fd
      this.emit('open', fd)
    })
  }
  write(chunk: ArrayBuffer | string, encoding: string, cb: Function) {
    chunk = typeof chunk === 'string' ? Buffer.from(chunk, this.encoding) : chunk
    this.writeLen += chunk.byteLength
    let flag = this.writeLen < this.hightWaterMark
    this.needDrain = !flag
    if (this.writing) {
      this.queue.enQueue({ chunk, encoding, cb })
    } else {
      this.writing = true
      this._write(chunk, encoding, () => {
        cb()
        this._clearBuffer()
      })
    }
    return flag
  }
  private _clearBuffer() {
    let { data } = this.queue.deQueue() || {}
    if (data) {
      this._write(data.chunk, data.encoding, () => {
        data.cb && data.cb()
        this._clearBuffer()
      })
    } else {
      if (this.needDrain) {
        this.needDrain = false
        this.emit('drain')
      }
    }
  }
  _write(chunk, encoding, cb) {
    if (typeof this.fd !== 'number') {
      return this.once('open', () => this._write(chunk, encoding, cb))
    }
    fs.write(this.fd, chunk, this.start, chunk.length, this.writeOffset, (err, written) => {
      if (err) {
        this.emit('error', err)
      }
      this.writeOffset += written
      this.writeLen -= written
      cb && cb()
    })
  }
}
export default WriteStream
通信基本原理
- 
网络协议
- http 协议
 - tcp/ip 协议
 - arp 协议
 - ftp 协议
 
 - 
通信的条件
- 主机之间需要有介质:蓝牙,wifi,有线网络等
 - 主机必须有网卡设备:调制和解调,将二进制信息转换为高低电压的过程,就是调制的过程
 - 主机之间需要协商通信协议和网络速率
 
 - 
局域网通信
- 交换机:局域网内,一台主机要和另一台主机通信,需要经过交换机转发
 - 通过mac地址找到对应的设备,然后通过arp协议广播,告诉局域网内所有主机我的ip地址和mac地址的对应关系
 - 局域网之间通信,需要通过路由器转发
 
 - 
网络通信模型
 - 
OSI 七层模型
- 应用层: 用户与网络的接口
 - 表示层: 数据加密、转换、压缩
 - 会话层: 控制网络连接建立与终止
 - 传输层: 控制数据传输可靠性
 - 网络层: 确定目标网络
 - 数据链路层: 确定目标主机
 - 物理层: 各种物理设备和标准
 
 - 
Tcp/Ip 五层模型
- 封装的过程
 - 物理层 《- 数据链路层 《- 网络层 《- 传输层(Tcp) 《- 应用层(http)
 - 0110110 《- 目标Mac 源Mac 《- 目标IP 源IP 《- 目标端口 源端口 《- data
 - 封装和解封的过程是相反的
 
 - 
TCP 协议
- TCP 属于传输层协议
 - TCP 是面向连接的协议
 - TCP 用于处理实时通信
 - 常见控制字段
- SYN=1表示请求建立连接
 - FIN=1表示请求断开连接
 - ACK=1表示数据信息确认
 
 
 - 
TCP握手三次, 为啥挥手要四次
- 握手中间那次是,服务端确认链接和请求链接合并为一次,而四次挥手则是分开的。
 - 挥手是客户端与服务端断开连接的过程,中断前,服务器可能还要数据没发完,所以需要等数据发送完毕后,再继续后两次挥手
 
 
创建tcp通信
// server
import net from 'node:net'
const server = net.createServer()
server.listen(1234, 'localhost')
server.on('listening', () => {
    console.log('服务器启动成功')
})
server.on('connection', (socket) => {
    socket.on('data', (chunk) => {
        console.log(chunk.toString());
        socket.write('hello!' + chunk.toString())
    })
})
// client
import net from 'node:net'
const client = net.createConnection({
    port: 1234,
    host: 'localhost'
})
client.on('connect', () => {
    client.write('attacki')
    // buffer缓冲区,可能会导致本想分开发送的数据,粘连在一起发送
    // client.write('1')
    // client.write('2')
    // client.write('3')
    // 服务端:123
    for (let i = 0; i < 3; i++) {
        // 利用宏任务队列来解决粘连问题
        // 注意:如果是定时器相同的宏任务,仍可能粘连在一起发送
        setTimeout(() => client.write(val + ''), (i + 1) * 1000)
    }
})
client.on('data', (data) => {
    console.log(data.toString())
})
基于封包拆包解决粘包问题
class MyTransform {
    constructor(options) {
        this.packageHeaderLen = 4
        this.serialLen = 2
        this.serialNum = 0
    }
    encode(buffer, serialNum) {
        const body = Buffer.from(buffer)
        const buf = Buffer.alloc(this.packageHeaderLen)
        buf.writeInt16BE(serialNum || this.serialNum, 0)
        buf.writeInt16BE(body.length, this.serialLen)
        if (serialNum == undefined) this.serialNum++
        return Buffer.concat([buf, body])
    }
    decode(buffer) {
        const header = buffer.slice(0, this.packageHeaderLen)
        const body = buffer.slice(this.packageHeaderLen)
        return {
            serialNum: header.readInt16BE(),
            bodyLength: header.readInt16BE(this.serialLen),
            body: body.toString()
        }
    }
    getPackageLength(chunk) {
        // 包长度小于
        if (chunk.length < this.packageHeaderLen) return 0
        return chunk.readInt16BE(this.serialLen) + this.packageHeaderLen
    }
}
// server
import net from 'node:net'
import Transform from './transform.js'
const server = net.createServer()
server.listen(1234, 'localhost')
server.on('listening', () => console.log('服务器启动成功'))
const ts = new Transform()
let overRage = null
server.on('connection', (socket) => {
    socket.on('data', (chunk) => {
        if (overRage) {
            chunk = Buffer.concat([overRage, chunk])
        }
        let length = 0
        while (length = ts.getPackageLength(chunk)) {
            const packContent = chunk.slice(0, length)
            chunk = chunk.slice(length)
            const data = ts.decode(packContent)
            const serialNum = data.serialNum
            const content = 'hello! ' + data.body
            socket.write(ts.encode(content, serialNum))
        }
        overRage = chunk
    })
})
server.on('close', () => {})
server.on('error', () => {})
// client
import net from 'node:net'
import Transform from './transform.js'
const client = net.createConnection({
    port: 1234,
    host: 'localhost'
})
const ts = new Transform()
let overRage = null
client.on('connect', () => {
    client.write(ts.encode('attacki'))
    client.write(ts.encode('david'))
    client.write(ts.encode('arron'))
    client.write(ts.encode('grayson'))
    // for (let i = 0; i < 3; i++) {
    //   (function (val) {
    //     
    //     setTimeout(() => client.write(val + ''), (i + 1) * 1000)
    //   })(i)
    // }
})
client.on('data', (chunk) => {
    if (overRage) {
        chunk = Buffer.concat([overRage, chunk])
    }
    let length
    while (length = ts.getPackageLength(chunk)) {
        const packContent = chunk.slice(0, length)
        const data = ts.decode(packContent)
        chunk = chunk.slice(length)
        console.log(data)
    }
    overRage = chunk
})
client.on('close', () => {})
client.on('error', () => {})
http协议
- 缓存
 - 编码
 - 断点续传
 - 防盗链