了解NodeJs中的多线程
日期:2021-08-12 15:30:00
更新:2021-08-12 15:30:00
标签:JavaScript, node
分类:JavaScript, node
node 中的多线程是在 node 版本 v10.5.0 引入的一个的一个新特性,在很长一段时间内Worker Thread都是实验性质的,到目前为止,node 稳定的版本已经到了 14.17.4,这个特性已经变成稳定可用了。

Worker Threads
node 中的多线程是在 node 版本 v10.5.0 引入的一个的一个新特性,在很长一段时间内Worker Thread都是实验性质的,到目前为止,node 稳定的版本已经到了 14.17.4,这个特性已经变成稳定可用了。
worker_thread 是什么
先来来看看官网的描述
The `worker_threads` module enables the use of threads that
execute JavaScript in parallel.
意思就是:worker_thread模块允许使用线程来并行执行 JavaScript。
我们以前 Javascript 都是单线程然后利用一个事件循环队列(event loop)不断监听执行栈是否有函数进入。对于 worker_thread,其实可以理解为一个event loop中有多个 javascript 工作线程,创建一个线程相当于创建一个新的 js 执行环境。多线程的运行如下图

与 child_process 的区别
child_process是可以创建一个新的 node 进程,worker_thread与它的最大区别就是:worker_thread 可以共享内存,公共的数据可以在线程之间公用,而 child_process 只能通过 JSON 去传递数据。
还有就是:因为线程是在一个进程内的,创建一个线程的开销会比创建一个进程要小
worker_thread 的适用范围
worker_thread在CPU 密集型的 JS 操作中非常有用,但是在 IO 密集型操作中性能不会有太多的改善,反而 Node 自带的一步 IO 操作会比工作线程更有用。
使用 worker_thread
介绍完 worker_thread 的概念,现在来介绍一下他的用法
创建工作线程
const { isMainThread, Worker, parentPort } = require("worker_threads");
if (isMainThread) {
createChildThread();
} else {
parentPort.on("message", (msg) => {
console.log("parent thread listen:", msg);
});
parentPort.postMessage("hello child thread");
}
function createChildThread() {
const worker = new Worker(__filename);
worker.on("message", (val) => {
console.log("child thread listen:", val);
worker.postMessage("hello parent thread");
});
}
上面代码在主线程的时候调用创建工作线程的方法,创建一个工作线程并且新增一个,而创建工作线程后调用主线程的端口向工作线程发送消息,工作线程接受到消息后再向主线程回应。大概的流程图如下

执行后返回
创建线程前

创建线程后,线程数量+1


线程间通信
上文提到,worker_threads可以通过ArrayBuffer或SharedArrayBuffer共享内存。接下来看一下,它在代码中是如何实现的.
const { isMainThread, Worker, parentPort } = require("worker_threads");
if (isMainThread) {
createChildThread();
} else {
const shareBuf = new SharedArrayBuffer(4);
const bufInt = new Uint8Array(shareBuf);
parentPort.on("message", () => {
console.log("parent thread listen:", bufInt);
});
parentPort.postMessage({ bufInt });
}
function createChildThread() {
const worker = new Worker(__filename);
worker.on("message", ({ bufInt }) => {
console.log("child thread listen:", bufInt);
bufInt[0] = 11;
worker.postMessage("finished");
});
}
运行后返回,可以看到在子线程创建一个SharedArrayBuffer,用主线程广播的一个数据,在子线程中接收后赋值,因为是线程间共享的 Buffer,所以主线程这边也可以看到在子线程中修改的数据。

如图所示,使用SharedArrayBuffer创建的值会分配到共享内存中,所有线程都可以共用这块内存。

线程间通信
我们已经学会创建线程和使用共享内存了,从上面代码可以看到,线程都是从主线程中发送消息,然后子线程向主线程回复消息,没有办法让两个子线程直接直接通信,如果我想让两个子线程直接通信,那就需要用到MessageChannel这个类了,MessageChannel的具体用法可以点击这里。现在就来实现一下子线程之间的直接通信。
const {
isMainThread,
Worker,
parentPort,
MessageChannel,
threadId,
} = require("worker_threads");
if (isMainThread) {
createChildThread();
} else {
parentPort.on("message", ({ port }) => {
port.on("message", (msg) => {
console.log(`port${threadId} listen:`, msg);
});
port.postMessage(`hello, im thread ${threadId}`);
});
parentPort.postMessage("hello");
}
function createChildThread() {
const { port1, port2 } = new MessageChannel();
const worker1 = new Worker(__filename);
const worker2 = new Worker(__filename);
worker2.postMessage({ port: port1 }, [port1]);
worker1.postMessage({ port: port2 }, [port2]);
}
运行代码后返回

从代码实现可以看到,最终建立子线程直接通信的步骤还是在主线程的 message 事件中。建立通信的流程图如下

实战
了解完线程的概念和用法,现在来实战一下:比如在数组中有 100 万条数据需要 md5 加密,对比一下使用工作线程和不使用工作线程的实现速度怎么样
const {
isMainThread,
Worker,
parentPort,
threadId,
} = require("worker_threads");
const { createHash } = require("crypto");
const ARR_NUM = 1000000;
const WORKER_NUM = 1;
const size = Math.ceil(ARR_NUM / WORKER_NUM);
if (isMainThread) {
createChildThread();
} else {
parentPort.on("message", ({ status, index, startTime }) => {
if (index === WORKER_NUM) {
const usedTime = Date.now() - startTime;
console.log(`finish bussiness time: ${usedTime}ms`);
process.exit(threadId);
}
});
const data = addHasCode(threadId, size, (threadId - 1) * size);
parentPort.postMessage({
business: "finish work",
data,
});
}
function createChildThread() {
let finishNumBuf = new SharedArrayBuffer(4);
let finishNum = new Uint8Array(finishNumBuf);
const startTime = Date.now();
for (let x = 0; x < WORKER_NUM; x++) {
const worker = new Worker(__filename, {});
worker.on("message", ({ business, data }) => {
if (business === "finish work") {
finishNum[0]++;
worker.postMessage({
status: `finish worker ${x}`,
index: finishNum[0],
startTime: startTime,
data,
});
}
});
}
console.log(`${WORKER_NUM} thread start working`);
}
function addHasCode(index, size, limit) {
const result = [];
for (let x = limit, num = index * size; x < num; x++) {
result.push(createHash("md5").update("hello world").digest("base64"));
}
return result;
}
使用1个线程计算,平均需要2700ms左右

使用5个线程计算,平均需要2000ms左右

使用20个线程计算,平均需要2500ms左右

使用60个线程计算,平均需要3800ms左右

可见,线程不是越多越好,过多的线程可能会增加过多的系统开销,速度也不如单线程时候运行。
小结
本文介绍了 nodeJs 中的worker_threads的概念,去多进程的区别,和它的优点。
介绍了worker_threads是如何使用,共享内存,还有子线程之间的通信。
最后用一个测试子进程的效率的例子说明worker_threads对比单线程运行。
参考