Skip to content
Go back

Nodejs之学习笔记

Updated:  at  11:34 PM

文章目录

nodejs架构

nodejs api的测试,是基于v16.20.2版本

异步非阻塞IO 事件驱动 单线程

global 对象

path

import path from 'node:path';
import {
    dirname
} from "node:path"
import {
    fileURLToPath
} from "node:url"
const __filename = fileURLToPath(
    import.meta.url);
const __dirname = dirname(__filename);

// 获取路径中最后名称, 第二个参数用于判断是否匹配扩展名,匹配就返回名字,不匹配返回完整名称
console.log(path.basename(__filename)); // test.js
console.log(path.basename(__filename, 'js')); // test
console.log(path.basename(__filename, 'css')); // test.js
console.log(path.basename('/a/b/c')); // c
console.log(path.basename('/a/b/c/')); // c

// 获取路径中目录
console.log(path.dirname(__filename)); // /Volumes/D2/project/template/iwantit/test
console.log(path.dirname('/a/b/c/')); // /a/b

// 获取路径中扩展名称
console.log(path.extname(__filename));

// 获取路径是否是绝对路径
console.log(path.isAbsolute(__filename));

// 拼接多个路径
console.log(path.join('/a', 'b')); // /a/b

// 返回绝对路径
//  resolve([from], to)
console.log(path.resolve('./ds/d.html')); // /Volumes/D2/project/template/iwantit/test/ds/d.html

// 解析路径
console.log(path.parse(__filename));
// {
//   root: '/',
//   dir: '/Volumes/D2/project/template/iwantit/test',
//   base: 'test.js',
//   ext: '.js',
//   name: 'test'
// }

// 序列化路径
console.log(path.format({
    a: __filename,
    b: "d"
}));

// 规范化路径 处理平台差异
console.log(path.normalize(__filename));

buffer

nodejs打印buffer返回的是16进制格式,16进制可用4位二进制表示,两个16进制共占用8位,也就是一个字节

// fill 依据传入的数据重复进行填写
let b1 = Buffer.alloc(6);
b1.fill(123, 1, 5); // 从1写到5
console.log(b1); // <Buffer 00 7b 7b 7b 7b 00>  
console.log(b1.toString()); // {{{{

// write
let b1 = Buffer.alloc(6);
b1.write("123", 3, 2) // 从3开始写,写入长度为2
console.log(b1); // <Buffer 00 00 00 31 32 00>
console.log(b1.toString()); // 12

// toString 第二个参数表示从哪里开始读 第三个参数表示从哪里结束
let b1 = Buffer.from("乐观向上");
console.log(b1);
console.log(b1.toString('utf-8', 3)); // 观向上

// slice
let b1 = Buffer.from("乐观向上");
let b2 = b1.slice(3, 9);
console.log(b2);
console.log(b2.toString()); // 观向

// indexOf
let b1 = Buffer.from("乐观向上");
console.log(b1);
console.log(b1.indexOf('')); // 6

// copy
let b1 = Buffer.alloc(6);
let b2 = Buffer.from('晴天');

b2.copy(b1, 3, 3, 6); // 写入对象, 从数据源哪个位置开始, 写入起始位置,写入结束位置
console.log(b1.toString()); // 天
console.log(b2.toString()); // 晴天

b2.copy(b1);
console.log(b1.toString()); // 晴天

// concat
let b1 = Buffer.from('很远很远的');
let b2 = Buffer.from('距离');

let a = Buffer.concat([b1, b2], 9);
console.log(a.toString()) // 很远很

let b = Buffer.concat([b1, b2]);
console.log(b.toString()) // 很远很远的距离

let bol = Buffer.isBuffer(b); // true

// split

Buffer.prototype.split = function(sep) {
    let len = Buffer.from(sep).length;
    let res = [];
    let start = 0;
    let offset = 0;
    while ((offset = this.indexOf(sep, start)) !== -1) {
        res.push(this.slice(start, offset))
        start = offset + len;
    }

    res.push(this.slice(start))

    return res;
}

let b1 = Buffer.from("乐观向上");
console.log(b1.split("").map(v => v.toString()));

fs

fs.readFile(path.resolve('txt.txt'), 'utf-8', (err, data) => {
    if (!err) {
        console.log(data);

        fs.writeFile(path.resolve('txt.txt'),
            'ok', {
                mode: 438, // 八进制0o666 6*64+6*8+6 = 438  表示可读写,但不可执行
                flag: 'r+' // 用来控制如何读写,r+代表不清空写入
            },
            (err) => {
                console.log(err);
            })
    } else {
        console.log(err);
    }
})

// 追加内容
fs.appendFile('txt.txt', '乐观向上', (err) => {
    console.log('写入成功');
})

fs.copyFile('txt.txt', 'copy-txt.txt', (err) => {
    console.log('复制成功');
})
import path from 'node:path';
import fs from 'node:fs';
import browserSync from 'browser-sync';
import {
    dirname
} from "node:path"
import {
    fileURLToPath
} from "node:url"
import {
    marked
} from 'marked';

const __filename = fileURLToPath(
    import.meta.url);
const __dirname = dirname(__filename);

const template = `
<!DOCTYPE html>
<html lang="en">

<head>
 <meta charset="UTF-8">
 <meta http-equiv="X-UA-Compatible" content="IE=edge">
 <meta name="viewport" content="width=device-width, initial-scale=1.0">
 <title>Document</title>
 <style>{{style}}</style>
</head>

<body>
 {{content}}
</body>

</html>
`

let mdPath = path.resolve(__dirname, './index.md');
let cssPath = path.resolve(__dirname, './index.css');
let basename = path.basename(mdPath);
let extname = path.extname(basename);
let htmlPath = path.resolve(__dirname, basename.replace(extname, '.html'));

fs.watchFile(mdPath, (curr, prev) => {
    if (curr.mtime !== prev.mtime) {
        fs.readFile(mdPath, (err, data) => {
            if (!err) {
                const mdContent = marked.parse(data.toString());
                let contentTemp = template.replace('{{content}}', mdContent);
                fs.readFile(cssPath, (err, data) => {
                    if (!err) {
                        const cssStyle = data.toString();
                        contentTemp = contentTemp.replace('{{style}}', cssStyle);
                    };

                    fs.writeFile(htmlPath, Buffer.from(contentTemp), (err) => {
                        if (!err) {
                            console.log('文件生成成功');
                        }
                    })
                })
            }
        })
    }
})

browserSync.init({
    browser: '',
    server: __dirname,
    open: true,
    watch: true
})

fs处理文件打开和关闭

文件描述符的使用,在同一个进程中,文件关闭的时候,文件描述符会复用。

fs.open('./index.css', 'r', (err, fd) => {
    console.log('css', fd);
})

fs.open('./index.html', 'r', (err, fd) => {
    console.log('html', fd);
    fs.close(fd, (err) => {
        console.log('html 关闭成功');
        fs.open('./index.js', 'r', (err, fd) => {
            console.log('js', fd);
        })
    })
})

// css 20
// html 21
// html 关闭成功
// js 21

fs使用buffer进行文件操作

为啥要自己在buffer中进行读写?目的就是为了控制读取写入速度,如果全交给nodejs来做,速度无法控制,可能会导致数据出现问题

let buf = Buffer.alloc(10);

/**
 * 以下参数使用于 read和write方法
 * 
 * fd 定位当前被打开的文件
 * buf 用于表示当前缓冲区
 * offset 表示当前从 buf 的哪个位置开始执行写入
 * length 表示当前次写入的长度
 * position 表示当前从文件的哪个位置开始读取
 */

// read 读操作就是讲数据从磁盘文件写入到buffer中
fs.open('data.txt', 'r', (err, rfd) => {
    fs.read(rfd, buf, 1, 4, 3, (err, readBytes, data) => {
        console.log(readBytes);
        console.log(data.toString());
    })
})

// write 将缓冲区的数据写入到磁盘文件当中 
buf = Buffer.from("1234567890")
fs.open('back.txt', 'r+', (err, wfd) => {
    fs.write(wfd, buf, 0, 3, 2, (err, writtenBytes, buffer) => {
        console.log(writtenBytes);
        console.log(buffer.toString());
        fs.close(wfd)
    })
})

fs大文件传输和写入

/**
 * 1. 打开a文件,用read写入buffer
 * 2. 打开b文件,用write将buffer写入b中
 */

let bufferSize = 10;
let buf = Buffer.alloc(bufferSize);
let offset = 0;

fs.open('a.txt', 'r', (err, rfd) => {
    fs.open('b.txt', 'a+', (err, wfd) => {

        function next() {
            fs.read(rfd, buf, 0, bufferSize, offset, (err, readBytes, data) => {
                if (!readBytes) {
                    fs.close(rfd);
                    fs.close(wfd);
                    return
                }
                fs.write(wfd, buf, 0, readBytes, offset, (err, writtenBytes) => {
                    console.log('写入成功');
                    offset += readBytes;
                    next();
                })
            })
        }

        next();
    })
})

fs操作目录

fs.access('data.txt', (err) => !err && (console.log('有权限')))

fs.stat('data.txt', (err, statObj) => {
    console.log(statObj.size);
    console.log(statObj.isFile());
    console.log(statObj.isDirectory());
})

// 默认非递归创建
fs.mkdir('a/b/c', {
    recursive: true
}, (err) => {
    if (err) {
        console.log('创建失败');
    } else {
        console.log('创建成功');
    }
})

// 默认只能删除空目录 递归为true可以删除非空目录
fs.rmdir('a/b/c', {
    recursive: true
}, (err) => {
    if (err) {
        console.log('删除失败');
    } else {
        console.log('删除成功');
    }
})

// 读取目录
fs.readdir('node_modules/.bin', (err, files) => {
    console.log(files);
})

// unlink
fs.unlink('a.txt', (err) => {
    console.log(err);
    if (err) {
        console.log('删除失败');
    } else {
        console.log('删除成功');
    }
})

fs创建目录、删除目录

/**
 * 1. 将来调用时需要接受类似 a/b/c,路径以 / 连接
 * 2. 利用 / 进行分割,然后生成数组 ['a', 'b', 'c']
 * 3. 对上述数组进行便利,我们需要拿到每一项,然后与前一项进行拼接
 * 4. 判断拼接的路径是否可操作,有则存在,否则需创建
 */

function makeDirSync(dirPath) {
    let items = dirPath.split(path.sep)
    for (let i = 1; i <= items.length; i++) {
        let dir = items.slice(0, i).join(path.sep)
        try {
            fs.accessSync(dir)
        } catch (error) {
            fs.mkdirSync(dir)
        }
    }
}

makeDirSync('a/b/c')

// 删除目录
async function mkDirAsync(dirPath, cb) {
    const items = dirPath.split(path.sep);
    for (let i = 1; i <= items.length; i++) {
        const dir = items.slice(0, i).join(path.sep);
        try {
            await access(dir);
        } catch (error) {
            await mkdir(dir);
        }
    }
    cb && cb()
}

mkDirAsync('a/b/c')

function rmDirAsync(dirPath, cb) {
    // 判断path类型
    fs.stat(dirPath, (err, statObj) => {
        if (statObj.isDirectory()) {
            fs.readdir(dirPath, (err, files) => {
                console.log(files);

                const dirs = files.map(item => {
                    return path.join(dirPath, item)
                })

                let i = 0;

                function next() {
                    if (i >= dirs.length) return fs.rmdir(dirPath, cb);
                    rmDirAsync(dirs[i++], next)
                }

                next();
            })
        } else {
            fs.unlink(dirPath, cb)
        }
    })
}
rmDirAsync('a/b', () => {
    console.log('删除成功了');
})

组件化前端开发

commonjs

module

module.exports 和 exports 区别

模块加载流程

vm模块

1. runInThisContext

// test.txt
var ttt = 'test vm'

// test.js
import vm from 'node:vm';
import fs from 'node:fs';

let content = fs.readFileSync('./test.txt', 'utf-8');
vm.runInThisContext(content);
console.log(ttt);
console.log(global.ttt); // test.txt的内容运行后,ttt被设置在全局对象上了
// test vm  
// test vm  

let localVar = 'initial value';
const vmResult = vm.runInThisContext('localVar = "vm";');
console.log(`vmResult: '${vmResult}', localVar: '${localVar}'`);
// vmResult: 'vm', localVar: 'initial value'

2. runInContext

const vm = require('node:vm');

const contextObject = {
    globalVar: 1
};
vm.createContext(contextObject);

for (let i = 0; i < 10; ++i) {
    vm.runInContext('globalVar *= 2;', contextObject);
}
console.log(contextObject);
// Prints: { globalVar: 1024 }

模块加载函数实现

// 路径分析
// 缓存优化
// 文件定位
// 编译执行

事件模块

EventEmitter实现

// on
// emit
// once
// off

浏览器事件环执行顺序

Nodejs事件环

Stream模块

Nodejs的流分类

Node.js 的可读流(Readable Stream)内部通过复杂的状态管理和缓冲区机制实现了高效的数据处理。以下是其核心原理和关键问题的解析: 一、可读流内部数据管理机制

  1. 缓冲区的创建与数据插入
    • 可读流通过 _read 方法从底层数据源(如文件、网络)读取数据,并调用 push 方法将数据块插入内部缓冲区。 • 缓冲区使用链表结构(如 BufferList )实现,支持高效的数据插入和移除操作。链表每个节点存储一个数据块(Buffer 或字符串),通过 highWaterMark 参数控制缓冲区最大容量。 • 当调用 push 时,数据会被追加到链表尾部,并更新 length 属性记录当前缓冲区总字节数。若缓冲区未满(未达 highWaterMark ),流会继续触发 _read 以加载更多数据。

  2. 数据读取的触发条件
    流动模式(Flowing Mode):通过监听 data 事件或调用 pipe 方法,流会自动从缓冲区中拉取数据并触发事件。此时数据会被连续消费,直到缓冲区为空或达到 highWaterMark 限制。 • 暂停模式(Paused Mode):通过监听 readable 事件,需手动调用 read() 方法逐块读取数据。每次读取后,流会根据剩余缓冲区空间决定是否触发新的 _read 调用。


二、 readabledata 事件的切换原理

  1. 模式切换的触发方式
    切换到流动模式

    ◦ 添加 data 事件监听器; ◦ 调用 resume() 方法; ◦ 使用 pipe 方法连接可写流。

切换回暂停模式

◦ 移除所有 `data` 事件监听器并调用 `pause()` ;
◦ 调用 `unpipe()` 断开管道。

2. 底层状态管理
• 可读流通过内部状态对象 ReadableState 跟踪模式变化,关键字段包括:

◦ `flowing` :标识当前模式( `null` 初始态, `true` 流动模式, `false` 暂停模式);
◦ `needReadable` :标记是否需要触发 `readable` 事件;
◦ `reading` :标识是否正在从底层读取数据。

示例流程:当 data 事件触发时,流会进入流动模式,循环调用 read() 清空缓冲区;若缓冲区数据量低于 highWaterMark ,则触发 _read 填充新数据。


三、关键设计细节

  1. 背压控制(Backpressure)
    • 当可写流处理速度慢于可读流时,缓冲区达到 highWaterMark ,流会暂停 _read 调用,防止内存溢出。 • 通过 writable.write() 返回的布尔值判断下游处理能力,动态调整数据生产节奏。

  2. 事件循环与异步处理
    • 数据读取和事件触发通过 Node.js 事件循环异步调度。例如,在流动模式中, data 事件可能批量触发以减少上下文切换开销。

import {
    Duplex,
    Transform,
} from 'node:stream'
import fs from 'fs'

class MyTransform extends Transform {
    constructor() {
        super()
    }

    _transform(chunk, encoding, next) {
        this.push(chunk.toString().toUpperCase())
        next()
    }
}

const myTransform = new MyTransform()
myTransform.write("d")
myTransform.write("e")
myTransform.write("f")
myTransform.on('data', (chunk) => console.log(`Received chunk: ${chunk}`))

// Received chunk: D
// Received chunk: E
// Received chunk: F

可读流

let rs = fs.createReadStream('./test.txt', {
    flags: 'r',
    encoding: null,
    fd: null,
    mode: 0o666,
    autoClose: true,
    start: 0,
    // end: 13,
    highWaterMark: 5, // 控制底层数据分片大小(每次读取3字节)
})

rs.on('data', (chunk) => {
    console.log(chunk.toString())
    rs.pause()
    setTimeout(() => {
        rs.resume()
    }, 1000)
})

rs.on('readable', () => {
    console.log('触发readable事件,当前缓冲区长度:', rs._readableState.length);
    let chunk
    while ((chunk = rs.read(3)) !== null) {
        console.log('读取到数据:', chunk.toString());
        console.log('读取后缓冲区长度:', rs._readableState.length);
    }
    // 触发readable事件,当前缓冲区长度: 5
    // 读取到数据: 012
    // 读取后缓冲区长度: 2
    // 触发readable事件,当前缓冲区长度: 7
    // 读取到数据: 345
    // 读取后缓冲区长度: 4
    // 读取到数据: 678
    // 读取后缓冲区长度: 1
    // 触发readable事件,当前缓冲区长度: 1
    // 读取到数据: 9
    // 读取后缓冲区长度: 0
})

rs.on('open', () => {
    console.log('文件打开成功')
})

rs.on('close', () => {
    // 可读流只要读了数据,就会触发close事件
    console.log('文件关闭')
})

rs.on('end', () => {
    // 缓冲区中没有数据时,才会触发end事件
    // 适合在这里来整合数据,比如写入到数据库等操作
    console.log('读取结束')
})

rs.on('error', (err) => {
    console.log('读取错误', err)
})

可写流

import fs from 'fs'

const ws = fs.createWriteStream('output.txt', {
    flags: 'w',
    fd: null,
    autoClose: true,
    mode: 0o666,
    encoding: 'utf8',
    start: 0,
    highWaterMark: 4
})

ws.write('hello ', () => {
    console.log('写入完成1')
})

ws.write('world', () => {
    console.log('写入完成2')
})

ws.on('open', () => {
    console.log('文件打开了')
})

ws.on('error', () => {
    console.log('xxxxxx错误发生')
})

// Calling the writable.end() method signals that no more data will be written to the Writable
ws.end('!')

// 文件关闭事件,需要触发end事件才会触发close事件
ws.on('close', () => {
    console.log('文件关闭了')
})

理解writeable的缓冲机制

import fs from 'node:fs'

const ws = fs.createWriteStream('./output.txt', {
    highWaterMark: 4
})

async function write(chunk) {
    // write执行流程
    let flag = ws.write('1')
    console.log(flag) // true

    flag = ws.write('2')
    console.log(flag) // true

    // flag为false,不代表写入失败,而是表示缓冲区已满,需要等待drain事件触发后再继续写入
    flag = ws.write('3')
    console.log(flag) // false

    // 以上同步代码的写入操作,会将数据放入缓冲区,实际写入操作回调,被放入当前宏任务的微任务队列中

    const p1 = new Promise((resolve) => {
        setTimeout(() => {
            // 2s后写入数据4到文件中
            ws.write('4')
            resolve()
        }, 2000)
    })

    // 4的写入是另一个包含宏任务的微任务
    await p1

    // 但对于另一个宏任务中的写入操作,会在3秒后第二次写入数据5到文件中
    setTimeout(() => {
        // 因为await了2s,所有是4s后写入数据5到文件中
        ws.write('5')
    }, 2000)

    // 监听drain事件,当缓冲区再次可用时,触发该事件
    ws.on('drain', () => {
        console.log('drain')
    })
}

write()

// [email protected]版本
// 1. 创建可写流,设置高水位线为4,流对象初始化缓冲区及参数,注册流销毁及错误事件回调,保证
// 2. 每次调用write方法,将数据写入到缓冲区
// 3. 生产速度一般远大于消费速度,缓冲区会逐渐被填满
// 4. 第三次调用write方法,flag返回false,缓冲区已到达警戒线,需要手动限制继续写入数据到缓冲区,强行写入是被允许的,但是不推荐这么做
// 5. 生产者监听drain事件,当缓冲区再次可用时,触发drain事件,继续写入数据
// 6. 每完成一次宏任务,会执行一次写入操作

理解drain事件的作用

import fs from 'node:fs'
import {
    truncate
} from 'node:fs/promises'

const ws = fs.createWriteStream('./output.txt', {
    highWaterMark: 3
})
const data = '我很努力'

// ws.write(data) // 一次性写入,可能会有性能问题
let flag = true
let index = 0

function write() {
    flag = true
    while (flag && index < data.length) {
        flag = ws.write(data[index])
        index++
    }
}

write()

ws.on('drain', () => {
    console.log('drain triggered'); // 触发4次
    flag = true
    write()
})

模拟可读流及pipe的实现

import fs from 'node:fs'
import EventEmitter from 'node:events'
class MyReadStream extends EventEmitter {
    constructor(options) {
        super(options)
        this.path = options.path
        this.flag = options.flag || 'r'
        this.mode = options.mode || 0o666
        this.autoClose = options.autoClose || true
        this.start = options.start || 0
        this.end = options.end || null
        this.highWaterMark = options.highWaterMark || 64 * 1024
        this.fd = null
        this.readOffset = 0
        this.open()
        this.on('newListener', (event, listener) => {
            if (event === 'data') {
                this.read()
            }
        })
        this.drained = false

    }

    open() {
        fs.open(this.path, this.flag, this.mode, (err, fd) => {
            if (err) {
                this.emit('error', err)
            } else {
                this.fd = fd
                this.emit('open', this.fd)
            }
        })
    }
    read() {
        if (this.drained) return
        if (this.fd === null) return this.once('open', () => this.read())
        let buf = Buffer.alloc(this.highWaterMark)
        let howMuchToRead = this.end ? Math.min(this.end - this.start + 1, this.highWaterMark) : this.highWaterMark

        fs.read(this.fd, buf, this.start, howMuchToRead, this.readOffset, (err, bytesRead) => {
            if (err) {
                this.emit('error', err)
            } else if (bytesRead === 0) {
                this.emit('end')
            } else {
                let data = buf.slice(0, bytesRead)
                this.readOffset += bytesRead
                this.emit('data', data)
                process.nextTick(() => this.read())
            }
        })

    }
    close() {
        if (this.fd === null) return this.once('open', () => this.close())
        fs.close(this.fd, err => {
            if (err) {
                this.emit('error', err)
            } else {
                this.fd = null
                this.emit('close')
            }
        })
    }

    pipe(ws) {
        this.on('data', (chunk) => {
            const ret = ws.write(chunk)
            console.log(ret);

            if (!ret) this.pause()
        })

        ws.on('drain', () => {
            this.resume()
        })
    }

    pause() {
        this.drained = true
    }
    resume() {
        this.drained = false;
        this.read()
    }
}

const rs = new MyReadStream({
    path: './output.txt',
    highWaterMark: 3
})
const ws = fs.createWriteStream('./output2.txt', {
    highWaterMark: 1
})

rs.on('data', (chunk) => {
    console.log(chunk.toString())
})

rs.on('end', () => {
    console.log('文件读取完毕')
})

rs.pipe(ws)

单向链表的实现,模拟可写流实现

// linkList.ts
class LinkNode<T> {
  data: T
  next: LinkNode<T> | null = null

  constructor(data: T, next?: LinkNode<T> | null) {
    this.data = data
    this.next = next || null
  }
}

class LinkList<T> {
  head: LinkNode<T> | null = null
  size: number = 0
  constructor() { }

  validateIndex(index: number, checkRange: boolean = true) {
    if (index < 0) {
      throw new Error('Index must be greater than or equal to 0');
    }

    if (checkRange && index >= this.size) {
      throw new Error('Index must be less than the size of the list');
    }

    return true;
  }

  private _get_node(index: number): LinkNode<T> {
    let curNode = this.head!
    for (let curIndex = 0; curIndex < index; curIndex++) {
      curNode = curNode.next!
    }
    return curNode
  }

  add(content: T)
  add(index: number, content: T)
  add(indexOrContent: number | T, content?: T) {
    if (typeof indexOrContent !== 'number') {
      return this.add(0, indexOrContent);
    }

    const index = indexOrContent;
    this.validateIndex(index, false);

    let target: LinkNode<T> | null = null

    if (index === 0) {
      target = new LinkNode<T>(content!, this.head)
      this.head = target
    } else {
      const prev = this._get_node(index - 1)
      target = new LinkNode<T>(content!, prev.next)
      prev.next = target
    }

    this.size++
    return target
  }

  remove(index: number): LinkNode<T> | null {
    this.validateIndex(index)

    let rmTarget: LinkNode<T> | null = null

    if (index === 0) {
      rmTarget = this.head
      this.head = this.head!.next
    } else {
      const prev = this._get_node(index - 1)
      rmTarget = prev!.next

      if (rmTarget === null) return null

      prev!.next = rmTarget!.next
    }

    this.size--
    return rmTarget
  }

  set(index: number, content: T) {
    this.validateIndex(index)

    let target: LinkNode<T> = this._get_node(index)
    target.data = content
    return target
  }

  get(index: number): LinkNode<T> | null {
    this.validateIndex(index)
    return this._get_node(index)
  }

  clear() {
    this.head = null
    this.size = 0;
  }
}

export default LinkList

// queue.ts
import LinkList from "./linkList";

class Queue<T> {
  queue: LinkList<T>;
  constructor() {
    this.queue = new LinkList()
  }

  enQueue(item: T) {
    this.queue.add(item)
  }

  deQueue() {
    if(this.isEmpty()) return null
    return this.queue.remove(0)
  }

  isEmpty() {
    return this.queue.size === 0
  }

  clear(){
    this.queue.clear()
  }

}

export default Queue

// writeStream.ts
import fs from 'fs'
import EventsEmitter from 'events'
import Queue from './queue'

interface WriteBuffer {
  chunk: ArrayBuffer
  encoding: string
  cb: Function
}

class WriteStream extends EventsEmitter {
  flags: string
  mode: number
  fd: number | null
  autoClose: any
  hightWaterMark: any
  start: any
  encoding: any
  queue: Queue<WriteBuffer>
  writing: boolean
  writeLen: number
  needDrain: boolean
  writeOffset: number

  constructor(private path, options: any) {
    super()
    this.path = path
    this.flags = options.flags || 'w'
    this.mode = options.mode || 0o666
    this.autoClose = options.autoClose || true
    this.hightWaterMark = options.highWaterMark || 16 * 1024
    this.start = options.start || 0
    this.encoding = options.encoding || 'utf8'

    this.open()
    this.queue = new Queue()
    this.writing = false
    this.writeLen = 0
    this.writeOffset = 0
    this.needDrain = false

  }
  open() {
    fs.open(this.path, this.flags, (err, fd) => {
      if (err) {
        this.emit('error', err)
      }
      this.fd = fd
      this.emit('open', fd)
    })
  }

  write(chunk: ArrayBuffer | string, encoding: string, cb: Function) {
    chunk = typeof chunk === 'string' ? Buffer.from(chunk, this.encoding) : chunk
    this.writeLen += chunk.byteLength

    let flag = this.writeLen < this.hightWaterMark
    this.needDrain = !flag

    if (this.writing) {
      this.queue.enQueue({ chunk, encoding, cb })
    } else {
      this.writing = true
      this._write(chunk, encoding, () => {
        cb()
        this._clearBuffer()
      })
    }

    return flag
  }
  private _clearBuffer() {
    let { data } = this.queue.deQueue() || {}
    if (data) {
      this._write(data.chunk, data.encoding, () => {
        data.cb && data.cb()
        this._clearBuffer()
      })
    } else {
      if (this.needDrain) {
        this.needDrain = false
        this.emit('drain')
      }
    }

  }

  _write(chunk, encoding, cb) {
    if (typeof this.fd !== 'number') {
      return this.once('open', () => this._write(chunk, encoding, cb))
    }

    fs.write(this.fd, chunk, this.start, chunk.length, this.writeOffset, (err, written) => {
      if (err) {
        this.emit('error', err)
      }
      this.writeOffset += written
      this.writeLen -= written

      cb && cb()
    })
  }
}

export default WriteStream

通信基本原理

创建tcp通信

// server
import net from 'node:net'

const server = net.createServer()

server.listen(1234, 'localhost')

server.on('listening', () => {
    console.log('服务器启动成功')
})

server.on('connection', (socket) => {
    socket.on('data', (chunk) => {
        console.log(chunk.toString());

        socket.write('hello!' + chunk.toString())
    })
})

// client
import net from 'node:net'

const client = net.createConnection({
    port: 1234,
    host: 'localhost'
})

client.on('connect', () => {
    client.write('attacki')

    // buffer缓冲区,可能会导致本想分开发送的数据,粘连在一起发送
    // client.write('1')
    // client.write('2')
    // client.write('3')
    // 服务端:123
    for (let i = 0; i < 3; i++) {
        // 利用宏任务队列来解决粘连问题
        // 注意:如果是定时器相同的宏任务,仍可能粘连在一起发送
        setTimeout(() => client.write(val + ''), (i + 1) * 1000)
    }

})

client.on('data', (data) => {
    console.log(data.toString())
})

基于封包拆包解决粘包问题

class MyTransform {
    constructor(options) {
        this.packageHeaderLen = 4
        this.serialLen = 2
        this.serialNum = 0
    }
    encode(buffer, serialNum) {
        const body = Buffer.from(buffer)
        const buf = Buffer.alloc(this.packageHeaderLen)
        buf.writeInt16BE(serialNum || this.serialNum, 0)
        buf.writeInt16BE(body.length, this.serialLen)
        if (serialNum == undefined) this.serialNum++
        return Buffer.concat([buf, body])
    }

    decode(buffer) {
        const header = buffer.slice(0, this.packageHeaderLen)
        const body = buffer.slice(this.packageHeaderLen)

        return {
            serialNum: header.readInt16BE(),
            bodyLength: header.readInt16BE(this.serialLen),
            body: body.toString()
        }
    }

    getPackageLength(chunk) {
        // 包长度小于
        if (chunk.length < this.packageHeaderLen) return 0
        return chunk.readInt16BE(this.serialLen) + this.packageHeaderLen
    }
}

// server
import net from 'node:net'
import Transform from './transform.js'

const server = net.createServer()
server.listen(1234, 'localhost')

server.on('listening', () => console.log('服务器启动成功'))
const ts = new Transform()
let overRage = null

server.on('connection', (socket) => {
    socket.on('data', (chunk) => {
        if (overRage) {
            chunk = Buffer.concat([overRage, chunk])
        }
        let length = 0
        while (length = ts.getPackageLength(chunk)) {
            const packContent = chunk.slice(0, length)
            chunk = chunk.slice(length)
            const data = ts.decode(packContent)
            const serialNum = data.serialNum
            const content = 'hello! ' + data.body
            socket.write(ts.encode(content, serialNum))
        }
        overRage = chunk
    })
})

server.on('close', () => {})
server.on('error', () => {})

// client
import net from 'node:net'
import Transform from './transform.js'

const client = net.createConnection({
    port: 1234,
    host: 'localhost'
})

const ts = new Transform()
let overRage = null

client.on('connect', () => {
    client.write(ts.encode('attacki'))
    client.write(ts.encode('david'))
    client.write(ts.encode('arron'))
    client.write(ts.encode('grayson'))
    // for (let i = 0; i < 3; i++) {
    //   (function (val) {
    //     
    //     setTimeout(() => client.write(val + ''), (i + 1) * 1000)
    //   })(i)
    // }
})

client.on('data', (chunk) => {
    if (overRage) {
        chunk = Buffer.concat([overRage, chunk])
    }

    let length
    while (length = ts.getPackageLength(chunk)) {
        const packContent = chunk.slice(0, length)
        const data = ts.decode(packContent)
        chunk = chunk.slice(length)
        console.log(data)
    }
    overRage = chunk
})

client.on('close', () => {})
client.on('error', () => {})

http协议



Previous Post
C++之学习之旅
Next Post
Javascript之尾调优化