Node.js 是如何处理请求的
前言:在服务器软件中,如何处理请求是非常核心的问题。不管是底层架构的设计、IO 模型的选择,还是上层的处理都会影响一个服务器的性能,本文介绍 Node.js 在这方面的内容。
TCP 协议的核心概念
要了解服务器的工作原理首先需要了解 TCP 协议的工作原理。TCP 是一种面向连接的、可靠的、基于字节流的传输层全双工通信协议,它有 4 个特点:面向连接、可靠、流式、全双工。下面详细讲解这些特性。
面向连接
TCP 中的连接是一个虚拟的连接,本质上是主机在内存里记录了对端的信息,我们可以将连接理解为一个通信的凭证。如下图所示。
那么如何建立连接呢?TCP 的连接是通过三次握手建立的。
- 服务器首先需要监听一个端口。
- 客户端主动往服务器监听的端口发起一个 syn 包(第一次握手)。
- 当服务器所在操作系统收到一个 syn 包时,会先根据 syn 包里的目的 IP 和端口找到对应的监听 socket,如果找不到则回复 rst 包,如果找到则发送 ack 给客户端(第二次握手),接着新建一个通信 socket 并插入到监听 socket 的连接中队列(具体的细节会随着不同版本的操作系统而变化。比如连接中队列和连接完成队列是一条队列还是两条队列,再比如是否使用了 syn cookie 技术来防止 syn flood 攻击,如果使用了,收到 syn 包的时候就不会创建 socket,而是收到第三次握手的包时再创建)。
- 客户端收到服务器的 ack 后,再次发送 ack 给服务器,客户端就完成三次握手进入连接建立状态了。
- 当服务器所在操作系统收到客户端的 ack 时(第三次握手),处于连接中队列的 socket 就会被移到连接完成队列中。
- 当操作系统完成了一个 TCP 连接,操作系统就会通知相应的进程,进程从连接完成队列中摘下一个已完成连接的 socket 结点,然后生成一个新的 fd,后续就可以在该 fd 上和对端通信。具体的流程如下图所示。
完成三次握手后,客户端和服务器就可以进行数据通信了。操作系统收到数据包和收到 syn 包的流程不一样,操作系统会根据报文中的 IP 和端口找到处理该报文的通信 socket(而不是监听 socket),然后把数据包(操作系统实现中是一个 skb 结构体)挂到该通信 socket 的数据队列中。
当应用层调用 read 读取该 socket 的数据时,操作系统会根据应用层所需大小,从一个或多个 skb 中返回对应的字节数。同样,写也是类似的流程,当应用层往 socket 写入数据时,操作系统不一定会立刻发送出去,而是会保存到写缓冲区中,然后根据复杂的 TCP 算法发送。
当两端完成通信后需要关闭连接,否则会浪费内存。TCP 通过四次挥手实现连接的断开,第一次挥手可以由任意一端发起。前面讲过 TCP 是全双工的,所以除了通过四次挥手完成整个 TCP 连接的断开外,也可以实现半断开,比如客户端关闭写端表示不会再发送数据,但是仍然可以读取来自对端发送端数据。四次挥手的流程如下。
可靠
TCP 发送数据时会先缓存一份到已发送待确认队列中,并启动一个超时重传计时器,如果一定时间内没有收到对端到确认 ack,则触发重传机制,直到收到 ack 或者重传次数达到阈值才会结束流程。
流式
建立连接后,应用层就可以调用发送接口源源不断地发送数据。通常情况下,并不是每次调用发送接口,操作系统就直接把数据发送出去,这些数据的发送是由操作系统按照一定的算法去发送的。对操作系统来说,它看到的是字节流,它会按照 TCP 算法打包出一个个包发送到对端,所以当对端收到数据后,需要处理好数据边界的问题。
从上图中可以看到,假设应用层发送了两个 HTTP 请求,操作系统在打包数据发送时可能的场景是第一个包里包括了 HTTP 请求 1 的全部数据和部分请求 2 的数据,所以当对端收到数据并进行解析时,就需要根据 HTTP 协议准确地解析出第一个 HTTP 请求对应的数据。
因为 TCP 的流式协议,所以基于 TCP 的应用层通常需要定义一个应用层协议,然后按照应用层协议实现对应的解析器,这样才能完成有效的数据通信,比如常用的 HTTP 协议。对比来说 UDP 是面向数据包的协议,当应用层把数据传递给 UDP 时,操作系统会直接打包发送出去(如果数据字节大小超过阈值则会报错)。
全双工
刚才提到 TCP 是全双工的,全双工就是通信的两端都有一个发送队列和接收队列,可以同时发送和接收,互不影响。另外也可以选择关闭读端或者写端。
服务器的工作原理
介绍了 TCP 协议的概念后,接着看看如何创建一个 TCP 服务器(伪代码)。
// 创建一个 socket,拿到一个文件描述符
int server_fd = socket();
// 绑定地址(IP + 端口)到该 socket 中
bind(server_fd, addressInfo);
// 修改 socket 为监听状态,这样就可以接收 TCP 连接了
listen(server_fd);
执行完以上步骤,一个服务器就启动了。服务器启动的时候会监听一个端口,如果有连接到来,我们可以通过 accept 系统调用拿到这个新连接对应的 socket,那这个 socket 和监听的 socket 是不是同一个呢?
其实 socket 分为监听型和通信型。表面上服务器用一个端口实现了多个连接,但是这个端口是用于监听的,底层用于和客户端通信的其实是另一个 socket。每当一个连接到来的时候,操作系统会根据请求包的目的地址信息找到对应的监听 socket,如果找不到就会回复 RST 包,如果找到就会生成一个新的 socket 与之通信(accept 的时候返回的那个)。监听 socket 里保存了监听的 IP 和端口,通信 socket 首先从监听 socket 中复制 IP 和端口,然后把客户端的 IP 和端口也记录下来。这样一来,下次再收到一个数据包,操作系统就会根据四元组从 socket 池子里找到该 socket,完成数据的处理。因此理论上,一个服务器能接受多少连接取决于服务器的硬件配置,比如内存大小。
接下来分析各种处理连接的方式。
串行模式
串行模式就是服务器逐个处理连接,处理完前面的连接后才能继续处理后面的连接,逻辑如下。
while(1) {int client_fd = accept(server_fd);read(client_fd);write(client_fd);
}
上面的处理方式是最朴素的模型,如果没有连接,则服务器处于阻塞状态,如果有连接服务器就会不断地调用 accept 摘下完成三次握手的连接并处理。假设此时有 n 个请求到来,进程会从 accept 中被唤醒,然后拿到一个新的 socket 用于通信,结构图如下。
这种处理模式下,如果处理的过程中调用了阻塞 API,比如文件 IO,就会影响后面请求的处理,可想而知,效率是非常低的,而且,并发量比较大的时候,监听 socket 对应的队列很快就会被占满(已完成连接队列有一个最大长度),导致后面的连接无法完成。这是最简单的模式,虽然服务器的设计中肯定不会使用这种模式,但是它让我们了解了一个服务器处理请求的整体过程。
多进程模式
串行模式中,所有请求都在一个进程中排队被处理,效率非常低下。为了提高效率,我们可以把请求分给多个进程处理。因为在串行处理的模式中,如果有文件 IO 操作就会阻塞进程,继而阻塞后续请求的处理。在多进程的模式中,即使一个请求阻塞了进程,操作系统还可以调度其它进程继续执行新的任务。多进程模式分为几种。
按需 fork
按需 fork 模式是主进程监听端口,有连接到来时,主进程执行 accept 摘取连接,然后通过 fork 创建子进程处理连接,逻辑如下。
while(1) { int client_fd = accept(socket); // 忽略出错处理 if (fork() > 0) { continue;// 父进程负责 accept } else { // 子进程负责处理连接handle(client_fd); exit(); }
}
这种模式下,每次来一个请求,就会新建一个进程去处理,比串行模式稍微好了一点,每个请求都被独立处理。假设 a 请求阻塞在文件 IO,不会影响 b 请求的处理,尽可能地做到了并发。它的缺点是
- 进程数有限,如果有大量的请求,需要排队处理。
- 进程的开销会很大,对于系统来说是一个负担。
- 创建进程需要时间,实时创建会增加处理请求的时间。
pre-fork 模式 + 主进程 accept
pre-fork 模式就是服务器启动的时候,预先创建一定数量的进程,但是这些进程是 worker 进程,不负责接收连接,只负责处理请求。处理过程为主进程负责接收连接,然后把接收到的连接交给 worker 进程处理,流程如下。
逻辑如下:
let fds = [[], [], [], …进程个数];
let process = [];
for (let i = 0 ; i < 进程个数; i++) { // 创建管道用于传递文件描述符 socketpair(fds[i]); let pid; if (pid = fork() > 0) { // 父进程 process.push({pid, 其它字段}); } else { const index = i; // 子进程处理请求 while(1) { // 从管道中读取文件描述符 var client_fd = read(fd[index][1]); // 处理请求 handle(client_fd); } }
}
// 主进程 accept
for (;;) { const clientFd = accept(socket); // 找出处理该请求的子进程 const i = findProcess(); // 传递文件描述符 write(fds[i][0], clientFd);
}
和 fork 模式相比,pre-fork 模式相对比较复杂,因为在前一种模式中,主进程收到一个请求就会实时 fork 一个子进程,这个子进程会继承主进程中新请求对应的 fd,可以直接处理该 fd 对应的请求。但是在进程池的模式中,子进程是预先创建的,当主进程收到一个请求的时候,子进程中无法拿得到该请求对应的 fd 。这时候就需要主进程使用传递文件描述符的技术把这个请求对应的 fd 传给子进程。
pre-fork 模式 + 子进程 accept
刚才介绍的模式中,是主进程接收连接,然后传递给子进程处理,这样主进程就会成为系统的瓶颈,它可能来不及接收和分发请求给子进程,而子进程却很空闲。子进程 accept 这种模式也是会预先创建多个进程,区别是多个子进程会调用 accept 共同处理请求,而不需要父进程参与,逻辑如下。
int server_fd = socket();
bind(server_fd);
for (let i = 0 ; i < 进程个数; i++) { if (fork() > 0) { // 父进程负责监控子进程 } else { // 子进程处理请求 listen(server_fd);while(1) { int client_fd = accept(socket); handle(client_fd); } }
}
这种模式下多个子进程都阻塞在 accept,如果这时候有一个请求到来,那么所有的子进程都会被唤醒,但是先被调度的子进程会摘下这个请求节点,后续的进程被唤醒后可能会遇到已经没有请求可以处理,而又进入睡眠,这种进程被无效唤醒的现象就是著名的惊群现象。这种模式的处理流程如下。
Nginx 中解决了惊群这个问题,它的处理方式是在 accpet 之前先加锁,拿到锁的进程才进行 accept,这样就保证了只有一个进程会阻塞在 accept,不会引起惊群问题,但是新版操作系统已经在内核层面解决了这个问题,每次只会唤醒一个进程。
多线程模式
除了使用多进程外,也可以使用多线程技术处理连接,多线程模式和多进程模式类似,区别是在进程模式中,每个子进程都有自己的 task_struct,这就意味着在 fork 之后,每个进程负责维护自己的数据、资源。线程则不一样,线程共享进程的数据和资源,所以连接可以在多个线程中共享,不需要通过文件描述符传递的方式进行处理,比如如下架构。
上图中,主线程负责 accept 请求,然后通过互斥的方式插入一个任务到共享队列中,线程池中的子线程同样是通过互斥的方式,从共享队列中摘取节点进行处理。
事件驱动
从之前的处理模式中我们知道,为了应对大量的请求,服务器通常需要大量的进程 / 线程,这是个非常大的开销。现在很多服务器(Nginx、Nodejs、Redis)都开始使用单进程 + 事件驱动模式去设计,这种模式可以在单个进程中轻松处理成千上万的请求。
但也正因为单进程模式下,再多的请求也只在一个进程里处理,这样一个任务会一直在占据 CPU,后续的任务就无法执行了。因此,事件驱动模式不适合 CPU 密集型的场景,更适合 IO 密集的场景(一般都会提供线程 / 线程池,负责处理 CPU 或者阻塞型的任务)。大部分操作系统都提供了事件驱动的 API,但是事件驱动在不同系统中实现不一样,所以一般都会有一层抽象层抹平这个差异。这里以 Linux 的 epoll 为例子。
// 创建一个 epoll 实例
int epoll_fd = epoll_create();
/* 在 epoll 给某个文件描述符注册感兴趣的事件,这里是监听的 socket,注册可读事件,即连接到来 event = { event: 可读 fd: 监听 socket // 一些上下文 }
*/
epoll_ctl(epoll_fd , EPOLL_CTL_ADD , socket, event);
while(1) { // 阻塞等待事件就绪,events 保存就绪事件的信息,total 是个数 int total= epoll_wait(epoll_fd , 保存就绪事件的结构events, 事件个数, timeout); for (let i = 0; i < total; i++) { if (events[fd] === 监听 socket) { int client_fd = accpet(socket); // 把新的 socket 也注册到 epoll,等待可读,即可读取客户端数据 epoll_ctl(epoll_fd , EPOLL_CTL_ADD , client_fd, event); } else { // 从events[i] 中拿到一些上下文,执行相应的回调 } }
}
事件驱动模式的处理流程为服务器注册文件描述符和事件到 epoll 中,然后 epoll 开始阻塞,当有事件触发时 epoll 就会返回哪些 fd 的哪些事件触发了,接着服务器遍历就绪事件并执行对应的回调,在回调里可以再次注册 / 删除事件,就这样不断驱动着进程的运行。
epoll 的原理其实也类似事件驱动,它底层维护用户注册的事件和文件描述符,本身也会在文件描述符对应的文件 / socket / 管道处注册一个回调,等被通知有事件发生的时候,就会把 fd 和事件返回给用户,大致原理如下。
function epoll_wait() { for 事件个数 // 调用文件系统的函数判断 if (事件 [i] 中对应的文件描述符中有某个用户感兴趣的事件发生 ?) { 插入就绪事件队列 } else { /*在事件 [i] 中的文件描述符所对应的文件 / socke / 管道等资源中注册回调。感兴趣的事件触发后回调 epoll,回调 epoll 后,epoll 把该 event[i] 插入就绪事件队列返回给用户 */}
}
SO_REUSEPORT 端口复用
新版 Linux 支持 SO_REUSEPORT 特性后,使得服务器性能有了很大的提升。 SO_REUSEPORT 之前,一个 socket 是无法绑定到同一个地址的,通常的做法是主进程接收连接然后传递给子进程处理,或者主进程 bind 后 fork 子进程,然后子进程执行 listen,但底层是共享同一个 socket,所以连接到来时所有子进程都会被唤醒,但是只有一个连接可以处理这个请求,其他的进程被无效唤醒。SO_REUSEPORT 特性支持多个子进程对应多个监听 socket,多个 socket 绑定到同一个地址,当连接到来时,操作系统会根据地址信息找到一组 socket,然后根据策略选择一个 socket 并唤醒阻塞在该 socket 的进程,被 socket 唤醒的进程处理自己的监听 socket 下的连接就行,架构如下。
除了前面介绍的模式外,还有基于协程的模式,服务器技术繁多,就不一一介绍了。
IO 模型
IO 模型是服务器中非常重要的部分,操作系统通常会提供了多种 IO 模型,常见的如下。
阻塞 IO
当线程执行一个 IO 操作时,如果不满足条件,当前线程会被阻塞,然后操作系统会调度其他线程执行。
非阻塞 IO
非阻塞 IO 在不满足条件的情况下直接返回一个错误码给线程,而不是阻塞线程。
那么这个阻塞是什么意思呢?直接看一段操作系统的代码。
// 没有空间可写了
while(!(space = UN_BUF_SPACE(pupd)))
{// 非阻塞模式,直接返回错误码if (nonblock) return(-EAGAIN);// 阻塞模式,进入阻塞状态interruptible_sleep_on(sock->wait);
}void interruptible_sleep_on(struct wait_queue **p)
{// 修改线程状态为阻塞状态__sleep_on(p,TASK_INTERRUPTIBLE);
}static inline void __sleep_on(struct wait_queue **p, int state)
{unsigned long flags;// current 代表当前执行的线程struct wait_queue wait = { current, NULL };// 修改线程状态为阻塞状态current->state = state;// 当前线程加入到资源的阻塞队列,资源就绪后唤醒线程add_wait_queue(p, &wait);// 重新调度其他线程执行,即从就绪的线程中选择一个来执行schedule();
}
通过这段代码,我们就可以非常明确地了解到阻塞和非阻塞到底是指什么。
多路复用 IO
在阻塞式 IO 中,我们需要通过阻塞进程来感知 IO 是否就绪,在非阻塞式 IO 中,我们需要通过轮询来感知 IO 是否就绪,这些都不是合适的方式。为了更好感知 IO 是否就绪,操作系统实现了订阅发布机制,我们只需要注册感兴趣的 fd 和事件,当事件发生时我们就可以感知到。多路复用 IO 可以同时订阅多个 fd 的多个事件,是现在高性能服务器的基石。看一个例子。
#include <sys/event.h>
#include <fcntl.h>
#include <stdio.h>int main(int argc, char **argv)
{ // 用于注册事件到 kqueuestruct kevent event;// 用于接收从 kqueue 返回到事件,表示哪个 fd 触发了哪些事件struct kevent emit_event;int kqueue_fd, file_fd, result;// 打开需要监控的文件,拿到一个 fdfile_fd = open(argv[1], O_RDONLY);if (file_fd == -1) {printf("Fail to open %s", argv[1]);return 1;}// 创建 kqueue 实例kqueue_fd = kqueue();// 设置需要监听的事件,文件被写入时触发EV_SET(&event,file_fd, EVFILT_VNODE, EV_ADD | EV_CLEAR, NOTE_RENAME, 0, NULL);// 注册到操作系统result = kevent(kqueue_fd, &event, 1, NULL, 0, NULL);// 不断阻塞等待,直到文件被写入while(1) {// result 返回触发事件的 fd 个数,这里是一个result = kevent(kqueue_fd, NULL, 0, &emit_event, 1, NULL);if (result > 0) {printf("%s have been renamed\n", argv[1]);}}
}
异步 IO
前面介绍的几种 IO 模型中,当 IO 就绪时需要自己执行读写操作,而异步 IO 是 IO 就绪时,操作系统帮助线程完成 IO 操作,然后再通知线程操作完成了。下面以 io_uring(Linux 中的异步 IO 框架) 为例了解下具体的情况。
uv_loop_t* loop;
napi_get_uv_event_loop(env, &loop);
struct io_uring_info *io_uring_data = (io_uring_info *)loop->data;
// 申请内存
struct request *req = (struct request *)malloc(sizeof(*req) + (sizeof(struct iovec) * 1));
req->fd = fd;
req->offset = offset;
// 保存回调
napi_create_reference(env, args[2], 1, &req->func);
req->env = env;
req->nvecs = 1;
// 记录buffer大小
req->iovecs[0].iov_len = bufferLength;
// 记录内存地址
req->iovecs[0].iov_base = bufferData;
// 提交给操作系统,操作系统读完后通知线程,op 为 IORING_OP_READV 表示读操作
submit_request(op, req, &io_uring_data->ring);
上面的代码就是我们提交了一个读请求给操作系统,然后操作系统在文件可读并且读完成后通知我们。
Libuv 虽然写着是异步 IO 库,但是它并不是真正的异步 IO。它的意思是,你提交一个 IO 请求时,可以注册一个回调,然后就可以去做其他事情了,等操作完成后它会通知你,它的底层实现是线程池 + 多路复用 IO。
Node.js TCP 服务器的实现
Node.js 服务器的底层是 IO 多路复用 + 非阻塞 IO,所以可以轻松处理成千上万的请求,但是因为 Node.js 是单线程的,所以更适合处理 IO 密集型的任务。下面看看 Node.js 中服务器是如何实现的。
启动服务器
在 Node.js 中,我们通常使用以下方式创建一个服务器。
// 创建一个 TCP Server
const server = net.createServer((socket) => {// 处理连接
});// 监听端口,启动服务器
server.listen(8888);
使用 net.createServer 可以创建一个服务器,然后拿到一个 Server 对象,接着调用 Server 对象的 listen 函数就可以启动一个 TCP 服务器了。下面来看一下具体的实现。
function createServer(options, connectionListener) { return new Server(options, connectionListener);
} function Server(options, connectionListener) { EventEmitter.call(this); // 服务器收到的连接数,可以通过 maxConnections 限制并发连接数 this._connections = 0; // C++ 层的对象,真正实现 TCP 功能的地方this._handle = null; // 服务器下的连接是否允许半关闭this.allowHalfOpen = options.allowHalfOpen || false; // 有连接时是否注册可读事件,如果该 socket 是交给其他进程处理的话可以设置为 true this.pauseOnConnect = !!options.pauseOnConnect;
}
createServer 返回的是一个一般的 JS 对象,继续看一下 listen 函数的逻辑,listen 函数逻辑很繁琐,但是原理大致是一样的,所以只讲解常用的情况。
Server.prototype.listen = function(...args) { /*处理入参,listen 可以接收很多种格式的参数,假设我们这里只传了 8888 端口号*/const normalized = normalizeArgs(args); // normalized = [{port: 8888}, null]; const options = normalized[0]; // 监听成功后的回调 const cb = normalized[1]; // listen 成功后执行的回调 if (cb !== null) {this.once('listening', cb);}listenIncluster(this, null, options.port | 0, 4, ...); return this;
};
listen 处理了入参后,接着调用了 listenIncluster。
function listenIncluster(server, address, port, addressType, backlog, fd, exclusive) { exclusive = !!exclusive; if (cluster === null) cluster = require('cluster'); if (cluster.isMaster || exclusive) { server._listen2(address, port, addressType, backlog, fd);return; }
}
这里只分析在主进程创建服务器的情况,listenIncluster 中执行了 _listen2,_listen2 对应的函数是 setupListenHandle。
function setupListenHandle(address, port, addressType, backlog, fd) { // 通过 C++ 层导出的 API 创建一个对象,该对象关联了 C++ 层的 TCPWrap 对象this._handle = new TCP(TCPConstants.SERVER);// 创建 socket 并绑定地址到 socket 中this._handle.bind(address, port); // 有完成三次握手的连接时执行的回调 this._handle.onconnection = onconnection; // 互相关联this._handle.owner = this; // 执行 C++ 层 listen this._handle.listen(backlog || 511); // 触发 listen 回调 nextTick(this[async_id_symbol], emitListeningNT, this);
}
setupListenHandle 的逻辑如下。
- 调用 new TCP 创建一个 handle(new TCP 对象关联了 C++ 层的 TCPWrap 对象)。
- 保存处理连接的函数 onconnection,当有连接时被执行。
- 调用了 bind 绑定地址到 socket。
- 调用 listen 函数修改 socket 状态为监听状态。
首先看看 new TCP 做了什么。
void TCPWrap::New(const FunctionCallbackInfo<Value>& args) {new TCPWrap(env, args.This(), ...);
}TCPWrap::TCPWrap(Environment* env, Local<Object> object, ProviderType provider): ConnectionWrap(env, object, provider) {// 初始化一个 tcp handleint r = uv_tcp_init(env->event_loop(), &handle_);
}
new TCP 本质上是创建一个 TCP 层的 TCPWrap 对象,并初始化了 Libuv 的数据结构 uv_tcp_t(TCPWrap 是对 Libuv uv_tcp_t 的封装)。接着看 bind。
template <typename T>
void TCPWrap::Bind(...) {// 通过 JS 对象拿到关联的 C++ TCPWrap 对象TCPWrap* wrap;ASSIGN_OR_RETURN_UNWRAP(&wrap,args.Holder(),args.GetReturnValue().Set(UV_EBADF));// 通过 JS 传入的地址信息直接调用 Libuvuv_tcp_bind(&wrap->handle_,reinterpret_cast<const sockaddr*>(&addr),flags);
}
Bind 函数的逻辑很简单,直接调用了 Libuv 函数。
int uv_tcp_bind(...) {return uv__tcp_bind(handle, addr, addrlen, flags);
}int uv__tcp_bind(uv_tcp_t* tcp,const struct sockaddr* addr,unsigned int addrlen,unsigned int flags) {// 创建一个 socket,并把返回的 fd 保存到 tcp 结构体中maybe_new_socket(tcp, addr->sa_family, 0);on = 1;// 默认设置了 SO_REUSEADDR 属性,后面具体分析setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));// 绑定地址信息到 socketbind(tcp->io_watcher.fd, addr, addrlen);return 0;
}
uv__tcp_bind 创建了一个 TCP socket 然后把地址信息保存到该 socket 中,执行 bind 绑定了地址信息后就继续调用 listen 把 socket 变成监听状态,C++ 层代码和 Bind 的差不多,就不再分析,直接看 Libuv 的代码。
int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
}int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) {static int single_accept = -1;unsigned long flags;int err;// 已废弃if (single_accept == -1) {const char* val = getenv("UV_TCP_SINGLE_ACCEPT");single_accept = (val != NULL && atoi(val) != 0); }// 有连接时是否连续接收,或者间歇性处理,见后面分析if (single_accept)tcp->flags |= UV_HANDLE_TCP_SINGLE_ACCEPT;flags = 0;// 设置 flags 到 handle 上,因为已经创建了 socketmaybe_new_socket(tcp, AF_INET, flags);listen(tcp->io_watcher.fd, backlog)// 保存回调,有连接到来时被 Libuv 执行tcp->connection_cb = cb;tcp->flags |= UV_HANDLE_BOUND;// 有连接来时的处理函数,该函数再执行上面的 connection_cbtcp->io_watcher.cb = uv__server_io;// 注册可读事件,等待连接到来uv__io_start(tcp->loop, &tcp->io_watcher, POLLIN);return 0;
}
uv_tcp_listen 首先调用了 listen 函数修改 socket 状态为监听状态,这样才能接收 TCP 连接,接着保存了 C++ 层的回调,并设置 Libuv 层的回调,最后注册可读事件等待 TCP 连接的到来。这里需要注意两个回调函数的执行顺序,当有 TCP 连接到来时 Libuv 会执行 uv__server_io,在 uv__server_io 里再执行 C++ 层的回调 cb。至此,服务器就启动了。其中 uv__io_start 最终会把服务器对应的文件描述符注册到 IO多路 复用模块中。
处理连接
当有三次握手的连接完成时,操作系统会新建一个通信的 socket,并通知 Libuv,Libuv 会执行 uv__server_io。
void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {uv_stream_t* stream;int err;stream = container_of(w, uv_stream_t, io_watcher);uv__io_start(stream->loop, &stream->io_watcher, POLLIN);// 回调了可能关闭了 server,所以需要实时判断while (uv__stream_fd(stream) != -1) {// 摘取一个 TCP 连接,成功的话,err 保存了对应的 fderr = uv__accept(uv__stream_fd(stream));// 保存 fd 在 accepted_fd,等待处理stream->accepted_fd = err;// 执行回调stream->connection_cb(stream, 0);// 如果回调里没有处理该 accepted_fd,则注销可读事件、先不处理新的连接if (stream->accepted_fd != -1) {uv__io_stop(loop, &stream->io_watcher, POLLIN);return;}// 设置了 UV_HANDLE_TCP_SINGLE_ACCEPT 则进入睡眠,让其他进程有机会参与处理if (stream->type == UV_TCP &&(stream->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) {struct timespec timeout = { 0, 1 };nanosleep(&timeout, NULL);}}
}
uv__server_io 中通过 uv__accept 从操作系统中摘取一个完成连接的 TCP socket 并拿到一个 fd ,接着保存到 accepted_fd 中并执行 connection_cb 回调。
此外,我们需要注意 UV_HANDLE_TCP_SINGLE_ACCEPT 标记。因为可能有多个进程监听同一个端口,当多个连接到来时,多个进程可能会竞争处理这些连接(惊群问题)。这样一来,首先被调度的进程可能会直接处理所有的连接,导致负载不均衡。通过 UV_HANDLE_TCP_SINGLE_ACCEPT 标记,可以在通知进程接收连接时,每接收到一个后先睡眠一段时间,让其他进程也有机会接收连接,一定程度解决负载不均衡的问题,不过这个逻辑最近被去掉了,Libuv 维护者 bnoordhuis 的理由是,第二次调用 uv__accept 时有 99.9% 的概念会返回 EAGAIN,那就是没有更多的连接可以处理,这样额外调用 uv__accept 带来的系统调用开销是比较可观的。
接着看 connection_cb,connection_cb 对应的是 C++ 层的 OnConnection。
// WrapType 为 TCPWrap,UVType 为 uv_tcp_t
template <typename WrapType, typename UVType>
void ConnectionWrap<WrapType, UVType>::OnConnection(uv_stream_t* handle, int status) { // HandleWrap 中保存了 handle 和 TCPWrap 的关系,这里取出来使用 WrapType* wrap_data = static_cast<WrapType*>(handle->data); Environment* env = wrap_data->env(); Local<Value> argv[] = { Integer::New(env->isolate(), status), Undefined(env->isolate()) }; // 新建一个表示和客户端通信的对象,和 JS 层执行 new TCP 一样 Local<Object> client_obj = WrapType::Instantiate(env,wrap_data,WrapType::SOCKET); WrapType* wrap; // 从 client_obj 中取出关联的 TCPWrap 对象存到 wrap 中 ASSIGN_OR_RETURN_UNWRAP(&wrap, client_obj); // 拿到 TCPWrap 中的 uv_tcp_t 结构体,再转成 uv_stream_t,因为它们类似父类和子类的关系uv_stream_t* client_handle = reinterpret_cast<uv_stream_t*>(&wrap->handle_); // 把通信 fd 存储到 client_handle 中 uv_accept(handle, client_handle);argv[1] = client_obj; // 回调上层的 onconnection 函数 wrap_data->MakeCallback(env->onconnection_string(), arraysize(argv), argv);
}
当建立了新连接时,操作系统会新建一个 socket。同样,在 Node.js 层,也会通过 Instantiate 函数新建一个对应的对象表示和客户端的通信。结构如下所示。
Instantiate 代码如下所示。
MaybeLocal<Object> TCPWrap::Instantiate(Environment* env,AsyncWrap* parent,TCPWrap::SocketType type) {// 拿到导出到 JS 层的 TCP 构造函数(缓存在env中)Local<Function> constructor = env->tcp_constructor_template()->GetFunction(env->context()).ToLocalChecked();Local<Value> type_value = Int32::New(env->isolate(), type);// 相当于我们在 JS 层调用 new TCP() 时拿到的对象return handle_scope.EscapeMaybe(constructor->NewInstance(env->context(), 1, &type_value));
}
新建完和对端通信的对象后,接着调用 uv_accept 消费刚才保存在 accepted_fd 中的 fd,并把对应的 fd 保存到 C++ TCPWrap 对象的 uv_tcp_t 结构体中。
int uv_accept(uv_stream_t* server, uv_stream_t* client) {int err;// 把 accepted_fd 保存到 client 中uv__stream_open(client,server->accepted_fd,UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);// 处理了,重置该字段server->accepted_fd = -1;// 保证注册了可读事件,继续处理新的连接uv__io_start(server->loop, &server->io_watcher, POLLIN);return err;
}
C++ 层拿到一个新的对象并且保存了 fd 到对象后,接着回调 JS 层的 onconnection。
// clientHandle 代表一个和客户端建立 TCP 连接的实体
function onconnection(err, clientHandle) { const handle = this; const self = handle.owner; // 新建一个 socket 用于通信 const socket = new Socket({ handle: clientHandle, allowHalfOpen: self.allowHalfOpen, pauseOnCreate: self.pauseOnConnect }); // 服务器的连接数加一 self._connections++; // 触发用户层连接事件 self.emit('connection', socket);
}
在 JS 层也会封装一个 Socket 对象用于管理和客户端的通信,整体的关系如下。
接着触发 connection 事件,剩下的事情就是应用层处理了,整体流程如下。
Node.js HTTP 服务器的创建
接着看看 HTTP 服务器的实现。下面是 Node.js 中创建服务器的例子。
const http = require('http');
http.createServer((req, res) => { res.write('hello'); res.end();
})
.listen(3000);
我们沿着 createServer 开始分析。
function createServer(opts, requestListener) { return new Server(opts, requestListener);
}
createServer 中创建了一个 Server 对象,来看看 Server 初始化的逻辑。
function Server(options, requestListener) { // 可以自定义表示请求的对象和响应的对象 this[kIncomingMessage] = options.IncomingMessage || IncomingMessage; this[kServerResponse] = options.ServerResponse || ServerResponse; // HTTP 头最大字节数 this.maxHeaderSize = options.maxHeaderSize; // 允许半关闭 net.Server.call(this, { allowHalfOpen: true }); // 有请求时的回调 if (requestListener) { this.on('request', requestListener); } // 服务器 socket 读端关闭时是否允许继续处理队列里的响应(TCP 上有多个请求,管道化) this.httpAllowHalfOpen = false; // 有连接时的回调,由 net 模块触发 this.on('connection', connectionListener); // 服务器下所有请求和响应的超时时间 this.timeout = 0; // 同一个 TCP 连接上,两个请求之前最多间隔的时间 this.keepAliveTimeout = 5000; // HTTP 头的最大个数this.maxHeadersCount = null; // 解析头部的最长时间,防止 ddos this.headersTimeout = 60 * 1000;
}
Server 中主要做了一些字段的初始化,并且监听了 connection 和 request 两个事件,当有连接到来时会触发 connection 事件,connection 事件的处理函数会调用 HTTP 解析器进行数据的解析,当解析出一个 HTTP 请求时就会触发 request 事件通知用户。
创建了 Server 对象后,接着我们调用它的 listen 函数。因为 HTTP Server 继承于 net.Server,所以执行 HTTP Server 的 listen 函数时,其实是执行了 net.Serve 的 listen 函数,net.Server 的 listen 函数前面已经分析过,就不再分析。当有请求到来时,会触发 connection 事件,从而执行 connectionListener。
function connectionListener(socket) { defaultTriggerAsyncIdScope( getOrSetAsyncId(socket), connectionListenerInternal, this, socket );
} // socket 表示新连接
function connectionListenerInternal(server, socket) { // socket 所属 server socket.server = server; // 分配一个 HTTP 解析器 const parser = parsers.alloc(); // 初始化解析器parser.initialize(HTTPParser.REQUEST, ...); // 关联起来parser.socket = socket; socket.parser = parser; const state = { onData: null, // 同一 TCP 连接上,请求和响应的的队列,线头阻塞的原理 outgoing: [], incoming: [], }; // 监听 TCP 上的数据,开始解析 HTTP 报文 state.onData = socketOnData.bind(undefined, server, socket, parser, state); socket.on('data', state.onData);// 解析 HTTP 头部完成后执行的回调 parser.onIncoming = parserOnIncoming.bind(undefined, server, socket, state); /*如果 handle 是继承 StreamBase 的流,则在 C++ 层解析 HTTP 请求报文,否则使用上面的 socketOnData 函数处理 HTTP 请求报文,TCP 模块的 isStreamBase 为 true */if (socket._handle && socket._handle.isStreamBase && !socket._handle._consumed) { parser._consumed = true; socket._handle._consumed = true; parser.consume(socket._handle); } // 执行 llhttp_execute 时的回调parser[kOnExecute] = onParserExecute.bind(undefined, server, socket, parser, state);
}
上面的 connectionListenerInternal 函数中首先分配了一个 HTTP 解析器,HTTP 解析器由以下代码管理。
const parsers = new FreeList('parsers', 1000, function parsersCb() {const parser = new HTTPParser();cleanParser(parser);parser.onIncoming = null;// 各种钩子毁掉parser[kOnHeaders] = parserOnHeaders;parser[kOnHeadersComplete] = parserOnHeadersComplete;parser[kOnBody] = parserOnBody;parser[kOnMessageComplete] = parserOnMessageComplete;return parser;
});
parsers 用于管理 HTTP 解析器,它负责分配 HTTP 解析器,并且在 HTTP 解析器不再使用时缓存起来给下次使用,而不是每次都创建一个新的解析器。分配完 HTTP 解析器后就开始等待 TCP 上数据的到来,即 HTTP 请求报文。但是这里有一个逻辑需要注意,上面代码中 Node.js 监听了 socket 的 data 事件,处理函数为 socketOnData,下面是 socketOnData 的逻辑。
function socketOnData(server, socket, parser, state, d) { // 交给 HTTP 解析器处理,返回已经解析的字节数 const ret = parser.execute(d);
}
socketOnData 调用 HTTP 解析器处理数据,这看起来没什么问题,但是有一个逻辑我们可能会忽略掉,看一下下面的代码。
if (socket._handle && socket._handle.isStreamBase) { parser.consume(socket._handle);
}
上面代码中,如果 socket._handle.isStreamBase 为 true(TCP handle 的 isStreamBase 为 true),则会执行 parser.consume(socket._handle),这个是做什么的呢?
static void Consume(const FunctionCallbackInfo<Value>& args) {Parser* parser;ASSIGN_OR_RETURN_UNWRAP(&parser, args.Holder());// 解析出 C++ TCPWrap 对象StreamBase* stream = StreamBase::FromObject(args[0].As<Object>());// 注册 parser 成为流的消费者,即 TCP 数据的消费者stream->PushStreamListener(parser);
}
Consume 会注册 parser 会成为流的消费者,这个逻辑会覆盖掉刚才的 onData 函数,使得所有的数据直接由 parser 处理,看一下当数据到来时,parser 是如何处理的。
void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override { // 解析 HTTP 协议Local<Value> ret = Execute(buf.base, nread); // 执行 kOnExecute 回调Local<Value> cb = object()->Get(env()->context(), kOnExecute).ToLocalChecked(); MakeCallback(cb.As<Function>(), 1, &ret);
}
在 OnStreamRead 中会源源不断地把数据交给 HTTP 解析器处理并执行 kOnExecute 回调,并且在解析的过程中,会不断触发对应的钩子函数。比如解析到 HTTP 头部时执行 parserOnHeaders。
function parserOnHeaders(headers, url) {// 记录解析到的 HTTP 头if (this.maxHeaderPairs <= 0 ||this._headers.length < this.maxHeaderPairs) {this._headers = this._headers.concat(headers);}this._url += url;
}
parserOnHeaders 会记录解析到的 HTTP 头,当解析完 HTTP 头 时会调用 parserOnHeadersComplete。
function parserOnHeadersComplete(versionMajor, versionMinor, headers, method,url, statusCode, statusMessage, upgrade,shouldKeepAlive) {const parser = this;const { socket } = parser;// 创建一个对象表示收到的 HTTP 请求const ParserIncomingMessage = (socket && socket.server &&socket.server[kIncomingMessage]) ||IncomingMessage;// 新建一个IncomingMessage对象const incoming = parser.incoming = new ParserIncomingMessage(socket);// 执行回调return parser.onIncoming(incoming, shouldKeepAlive);
}
parserOnHeadersComplete 中创建了一个对象来表示收到的 HTTP 请求,接着执行 onIncoming 函数,对应的是 parserOnIncoming。
function parserOnIncoming(server, socket, state, req, keepAlive) { // 请求入队(待处理的请求队列) state.incoming.push(req); // 新建一个表示响应的对象 const res = new server[kServerResponse](req); /*socket 当前已经在处理其它请求的响应,则先排队,否则挂载响应对象到 socket,作为当前处理的响应 */if (socket._httpMessage) { state.outgoing.push(res); } else { res.assignSocket(socket); } // 响应处理完毕后,需要做一些处理 res.on('finish', resOnFinish.bind(undefined, req, res, socket, state, server)); // 触发 request 事件说明有请求到来 server.emit('request', req, res);
}
我们看到这里会触发 request 事件通知用户有新请求到来,并传入request和response作为参数,这样用户就可以处理请求了。另外 Node.js 本身是不会处理 HTTP 请求体的数据,当 Node.js 解析到请求体时会执行 kOnBody 钩子函数,对应的是 parserOnBody 函数。
function parserOnBody(b, start, len) {// IncomingMessage 对象,即 request 对象const stream = this.incoming;// Pretend this was the result of a stream._read call.if (len > 0 && !stream._dumped) {const slice = b.slice(start, start + len);const ret = stream.push(slice);if (!ret)readStop(this.socket);}
}
parserOnBody 会把数据 push 到请求对象 request 中,接着 Node.js 会触发 data 事件,所以我们可以通过以下方式获取 body 的数据。
const server= http.createServer((request, response) => { request.on('data', (chunk) => { // 处理body }); request.on('end', () => { // body结束 });
})
Node.js 的多进程服务器架构
虽然 Node.js 是单进程单线程的应用,但是我们可以创建多个进程来共同请求。在创建 HTTP 服务器时会调用 net 模块的 listen,然后调用 listenIncluster。我们从该函数开始分析。
function listenIncluster(server, address, port, addressType, backlog, fd, exclusive, flags) { const serverQuery = { address: address, port: port, addressType: addressType, fd: fd, flags, }; cluster._getServer(server, serverQuery, listenOnMasterHandle); function listenOnMasterHandle(err, handle) { server._handle = handle; server._listen2(address,port, addressType, backlog, fd, flags); }
}
listenIncluster 函数会调用子进程 cluster 模块的 _getServer 函数。
cluster._getServer = function(obj, options, cb) { let address = options.address; const message = { act: 'queryServer', index, data: null, ...options }; message.address = address; // 给主进程发送消息 send(message, (reply, handle) => { // 根据不同模式做处理if (handle) shared(reply, handle, indexesKey, cb); else rr(reply, indexesKey, cb); });
};
从上面代码中可以看到,_getServer 函数会给主进程发送一个 queryServer 的请求并设置了一个回调函数。看一下主进程是如何处理 queryServer 请求的。
function queryServer(worker, message) { const key = `${message.address}:${message.port}:${message.addressType}:${message.fd}:${message.index}`; let handle = handles.get(key); if (handle === undefined) { let address = message.address; let constructor = RoundRobinHandle; // 根据策略选取不同的构造函数,UDP 只能使用共享模式,因为 UDP 不是基于连接的,没有连接可以分发 if (schedulingPolicy !== SCHED_RR || message.addressType === 'udp4' || message.addressType === 'udp6') { constructor = SharedHandle; } handle = new constructor(key, address, message.port, message.addressType, message.fd, message.flags); handles.set(key, handle); } handle.add(worker, (errno, reply, handle) => { const { data } = handles.get(key); // 返回结果给子进程send(worker, { errno, key, ack: message.seq, data, ...reply }, handle); });
}
queryServer 首先根据调度策略选择构造函数并创建一个对象,然后执行该对象的 add 方法并且传入一个回调。下面看看不同策略下的处理。
共享模式
首先看看共享模式的实现,共享模式对应前面分析的主进程管理子进程,多个子进程共同 accept 处理连接这种方式。
function SharedHandle(key, address, port, addressType, fd, flags) { this.key = key; this.workers = []; this.handle = null; this.errno = 0; let rval; if (addressType === 'udp4' || addressType === 'udp6') rval = dgram._createSocketHandle(address, port, addressType, fd, flags); else rval = net._createServerHandle(address, port, addressType, fd, flags); if (typeof rval === 'number') this.errno = rval; else this.handle = rval;
}
SharedHandle 是共享模式,即主进程创建好 handle,交给子进程处理,接着看它的 add 函数。
SharedHandle.prototype.add = function(worker, send) { this.workers.push(worker); send(this.errno, null, this.handle);
};
SharedHandle 的 add 把 SharedHandle 中创建的 handle 返回给子进程。接着看子进程拿到 handle 后的处理。
function shared(message, handle, indexesKey, cb) { const key = message.key; const close = handle.close; handle.close = function() { send({ act: 'close', key }); handles.delete(key); indexes.delete(indexesKey); // 因为是共享的,可以直接 close 掉而不会影响其它子进程等return close.apply(handle, arguments); }; handles.set(key, handle); // 执行 net 模块的回调 cb(message.errno, handle);
}
shared 函数把接收到的 handle 再回传到调用方,即 net 模块的 listenOnMasterHandle 函数,listenOnMasterHandle 会执行 listen 开始监听地址。
function setupListenHandle(address, port, addressType, backlog, fd, flags) {// this._handle 即主进程返回的 handle// 连接到来时的回调this._handle.onconnection = onconnection;this._handle[owner_symbol] = this;const err = this._handle.listen(backlog || 511);
}
这样多个子进程就成功启动了服务器。共享模式的核心逻辑是主进程在 _createServerHandle 创建 handle 时执行 bind 绑定了地址(但没有 listen),然后通过文件描述符传递的方式传给子进程,子进程执行 listen 的时候就不会报端口已经被监听的错误了,因为端口被监听的错误是执行 bind 的时候返回的。逻辑如下图所示。
看一个共享模式的使用例子。
const cluster = require('cluster');
const os = require('os');
// 设置为共享模式
cluster.schedulingPolicy = cluster.SCHED_NONE;// 主进程 fork 多个子进程
if (cluster.isMaster) {// 通常根据 CPU 核数创建多个进程 os.cpus().lengthfor (let i = 0; i < 3; i++) {cluster.fork();}
} else { // 子进程创建服务器const net = require('net');const server = net.createServer((socket) => {socket.destroy();console.log(`handled by process: ${process.pid}`);});server.listen(8080);
}
轮询模式
接着看轮询模式,轮询模式对应前面的主进程 accept,分发给多个子进程处理这种方式。
function RoundRobinHandle(key, address, port, addressType, fd, flags) { this.key = key; this.all = new Map(); this.free = []; this.handles = []; this.handle = null; this.server = net.createServer(assert.fail); if (fd >= 0) this.server.listen({ fd }); else if (port >= 0) { // 启动一个服务器this.server.listen({ port, host: address, ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY), }); } else this.server.listen(address); // UNIX socket path. // 监听成功后,注册 onconnection 回调,有连接到来时执行 this.server.once('listening', () => { this.handle = this.server._handle; // 分发请求给子进程this.handle.onconnection = (err, handle) => this.distribute(err, handle); this.server._handle = null; this.server = null; });
}
因为 RoundRobinHandle的 工作模式是主进程负责监听,收到连接后分发给子进程,所以 RoundRobinHandle 中直接启动了一个服务器,当收到连接时执行 this.distribute 进行分发。接着看一下RoundRobinHandle 的 add 函数。
RoundRobinHandle.prototype.add = function(worker, send) { this.all.set(worker.id, worker); const done = () => { // send 的第三个参数是 null,说明没有 handleif (this.handle.getsockname) { const out = {}; this.handle.getsockname(out); send(null, { sockname: out }, null); } else { send(null, null, null); // UNIX socket. } this.handoff(worker); }; // 否则等待 listen 成功后执行回调 this.server.once('listening', done);
};
RoundRobinHandle 会在 listen 成功后执行回调。我们回顾一下执行 add 函数时的回调。
handle.add(worker, (errno, reply, handle) => { const { data } = handles.get(key); send(worker, { errno, key, ack: message.seq, data, ...reply }, handle);
});
回调函数会把 handle 等信息返回给子进程。但是在 RoundRobinHandle 和 SharedHandle 中返回的 handle 是不一样的,分别是 null 和 net.createServer 实例,因为前者不需要启动一个服务器,它只需要接收来自父进程传递的连接就行。
接着我们回到子进程的上下文,看子进程是如何处理的,刚才我们讲过,不同的调度策略,返回的 handle 是不一样的,我们看轮询模式下的处理。
function rr(message, indexesKey, cb) { let key = message.key; // 不需要 listen,空操作function listen(backlog) { return 0; } function close() {// 因为 handle 是共享的,所以无法直接关闭,需要告诉父进程,引用数减一if (key === undefined)return;send({ act: 'close', key });handles.delete(key);indexes.delete(indexesKey);key = undefined;} // 构造假的 handle 给调用方const handle = { close, listen, ref: noop, unref: noop }; handles.set(key, handle); // 执行 net 模块的回调 cb(0, handle);
}
round-robin 模式下,Node.js 会构造一个假的 handle 返回给 net 模块,因为调用方会调用 handle 的这些函数。当有请求到来时,round-bobin 模块会执行 distribute 分发连接给子进程。
RoundRobinHandle.prototype.distribute = function(err, handle) { // 首先保存 handle 到队列 this.handles.push(handle); // 从空闲队列获取一个子进程 const worker = this.free.shift(); // 分发 if (worker) this.handoff(worker);
}; RoundRobinHandle.prototype.handoff = function(worker) { // 拿到一个 handle const handle = this.handles.shift(); // 没有 handle,则子进程重新入队 if (handle === undefined) { this.free.push(worker);return; } // 通知子进程有新连接 const message = { act: 'newconn', key: this.key }; sendHelper(worker.process, message, handle, (reply) => { // 接收成功 if (reply.accepted) handle.close(); else // 结束失败,则重新分发 this.distribute(0, handle);// 继续分发this.handoff(worker); });
};
可以看到 Node.js 没用按照严格的轮询,而是哪个进程接收连接快,就继续给它分发。接着看一下子进程是怎么处理该请求的。
function onmessage(message, handle) { if (message.act === 'newconn') onconnection(message, handle);
} function onconnection(message, handle) { const key = message.key; const server = handles.get(key); const accepted = server !== undefined; // 回复接收成功 send({ ack: message.seq, accepted }); if (accepted) // 在 net 模块设置server.onconnection(0, handle);
}
最终执行 server.onconnection 进行连接的处理。逻辑如下图所示。
看一下轮询模式的使用例子。
const cluster = require('cluster');
const os = require('os');
// 设置为轮询模式
cluster.schedulingPolicy = cluster.SCHED_RR;// 主进程 fork 多个子进程
if (cluster.isMaster) {// 通常根据 CPU 核数创建多个进程 os.cpus().lengthfor (let i = 0; i < 3; i++) {cluster.fork();}
} else { // 子进程创建服务器const net = require('net');const server = net.createServer((socket) => {socket.destroy();console.log(`handled by process: ${process.pid}`);});server.listen(8080);
}
实现一个高性能的服务器是非常复杂的,涉及到很多复杂的知识,但是即使不是服务器开发者,了解服务器相关的一些知识也是非常有用的。
相关文章:
Node.js 是如何处理请求的
前言:在服务器软件中,如何处理请求是非常核心的问题。不管是底层架构的设计、IO 模型的选择,还是上层的处理都会影响一个服务器的性能,本文介绍 Node.js 在这方面的内容。 TCP 协议的核心概念 要了解服务器的工作原理首先需要了…...
数据结构与算法之堆: Leetcode 215. 数组中的第K个最大元素 (Typescript版)
数组中的第K个最大元素 https://leetcode.cn/problems/kth-largest-element-in-an-array/ 描述 给定整数数组 nums 和整数 k,请返回数组中第 k 个最大的元素。请注意,你需要找的是数组排序后的第 k 个最大的元素,而不是第 k 个不同的元素。…...
SpringBoot快速入门
搭建SpringBoot工程,定义hello方法,返回“Hello SpringBoot” ②导入springboot工程需要继承的父工程;以及web开发的起步依赖。 ③编写Controller ④引导类就是SpringBoot项目的一个入口。 写注解写main方法调用run方法 快速构建SpringBoo…...
深度学习笔记_4、CNN卷积神经网络+全连接神经网络解决MNIST数据
1、首先,导入所需的库和模块,包括NumPy、PyTorch、MNIST数据集、数据处理工具、模型层、优化器、损失函数、混淆矩阵、绘图工具以及数据处理工具。 import numpy as np import torch from torchvision.datasets import mnist import torchvision.transf…...
高效的开发流程搭建
目录 1. 搭建 AI codebase 环境kaggle的服务器1. 搭建 AI codebase 环境 python 、torch 以及 cuda版本,对AI的影响最大。不同的版本,可能最终计算出的结果会有区别。 硬盘:PCIE转SSD的卡槽,, GPU: 软件源: Anaconda: 一定要放到固态硬盘上。 VS code 的 debug功能…...
浅谈OV SSL 证书的优势
随着网络威胁日益增多,保护网站和用户安全已成为每个企业和组织的重要任务。在众多SSL证书类型中,OV(Organization Validation)证书以其独特的优势备受关注。让我们深入探究OV证书的优势所在,为网站安全搭建坚实的防线…...
一篇博客学会系列(3) —— 对动态内存管理的深度讲解以及经典笔试题的深度解析
目录 动态内存管理 1、为什么存在动态内存管理 2、动态内存函数的介绍 2.1、malloc和free 2.2、calloc 2.3、realloc 3、常见的动态内存错误 3.1、对NULL指针的解引用操作 3.2、对动态开辟空间的越界访问 3.3、对非动态开辟内存使用free释放 3.4、使用free释放一块动态…...
【C++ techniques】虚化构造函数、虚化非成员函数
constructor的虚化 virtual function:完成“因类型而异”的行为;constructor:明确类型时构造函数;virtual constructor:视其获得的输入,可产生不同的类型对象。 //假如写一个软件,用来处理时事…...
蓝牙核心规范(V5.4)11.6-LE Audio 笔记之初识音频位置和通道分配
专栏汇总网址:蓝牙篇之蓝牙核心规范学习笔记(V5.4)汇总_蓝牙核心规范中文版_心跳包的博客-CSDN博客 爬虫网站无德,任何非CSDN看到的这篇文章都是盗版网站,你也看不全。认准原始网址。!!! 音频位置 在以前的每个蓝牙音频规范中,只有一个蓝牙LE音频源和一个蓝牙LE音频接…...
mysql双主+双从集群连接模式
架构图: 详细内容参考: 结果展示: 178.119.30.14(主) 178.119.30.15(主) 178.119.30.16(从) 178.119.30.17(从)...
嵌入式中如何用C语言操作sqlite3(07)
sqlite3编程接口非常多,对于初学者来说,我们暂时只需要掌握常用的几个函数,其他函数自然就知道如何使用了。 数据库 本篇假设数据库为my.db,有数据表student。 nonamescore4嵌入式开发爱好者89.0 创建表格语句如下: CREATE T…...
RandomForestClassifier 与 GradientBoostingClassifier 的区别
RandomForestClassifier(随机森林分类器)和GradientBoostingClassifier(梯度提升分类器)是两种常用的集成学习方法,它们之间的区别分以下几点。 1、基础算法 RandomForestClassifier:随机森林分类器是基于…...
计组——I/O方式
一、程序查询方式 CPU不断轮询检查I/O控制器中“状态寄存器”,检测到状态为“已完成”之后,再从数据寄存器取出输入数据。 过程: 1.CPU执行初始化程序,并预置传送参数;设置计数器、设置数据首地址。 2. 向I/O接口发…...
jsbridge实战2:Swift和h5的jsbridge通信
[[toc]] demo1: 文本通信 h5 -> app 思路: h5 全局属性上挂一个变量app 接收这个变量的内容关键API: navigation代理 navigationAction.request.url?.absoluteString // 这个变量挂载在 request 的 url 上 ,在浏览器实际无法运行,因…...
集合原理简记
HashMap 无论在构造函数是否指定数组长度,进行的都是延迟初始化 构造函数作用: 阈值:threshold,每次<<1 ,数组长度 负载因子 无参构造:设置默认的负载因子 有参:可以指定初始容量或…...
机器学习的超参数 、训练集、归纳偏好
一、介绍 超参数(Hyperparameters)和验证集(Validation Set)是机器学习中重要的概念,用于调整模型和评估其性能。 超参数: 超参数是在机器学习模型训练过程中需要手动设置的参数,而不是从数据…...
Leetcode1071. 字符串的最大公因子(三种方法,带详细解析)
Leetcode1071. 字符串的最大公因子 对于字符串 s 和 t,只有在 s t … t(t 自身连接 1 次或多次)时,我们才认定 “t 能除尽 s”。 给定两个字符串 str1 和 str2 。返回 最长字符串 x,要求满足 x 能除尽 str1 且 x 能…...
如何像人类一样写HTML之图像标签,超链接标签与多媒体标签
文章目录 前言一、图像标签1.1 什么是图像标签?2.2 如何使用图像标签? 二、超链接标签2.1 什么是超链接标签?2.2 如何使用超链接标签? 三、多媒体标签3.1 什么是多媒体标签?3.2 如何使用多媒体audio标签?3.…...
1300*C. Rumor(并查集贪心)
解析: 并查集,求每个集合的最小费用。 每次合并集合的时候,根节点保存当前集合最小的费用。 #include<bits/stdc.h> using namespace std; #define int long long const int N1e55; int n,m,a[N],p[N],cnt[N]; int find(int x){retur…...
python实用小代码(数据分析向)
一、python实现31省市名称替换 首先定义了一个字典province_dict,其中包含每个省市的无缩写名称与其标准名称之间的映射。然后,我们使用map()函数将每个省市名称从无缩写名称转换为标准名称,并将结果存储在新列省市标准名称中。 province_di…...
【oncmdmsg 鼠标】2023/8/19 上午9:50:14
2023/8/19 上午9:50:14 oncmdmsg 鼠标 2023/8/19 上午9:50:22 抱歉,您提到的 “oncmdmsg” 和 “鼠标” 是什么?请提供更多上下文信息,我将尽力帮助您。 2023/8/19 上午9:51:43 OnCmdMsg 2023/8/19 上午9:52:21 “OnCmdMsg” 是一个在 MFC (Microsoft Foundation Cla…...
插入排序:简单而有效的排序方法
在计算机科学中,排序算法是一个重要且常见的主题,它们用于对数据进行有序排列。插入排序(Insertion Sort)是其中一个简单但有效的排序算法。本文将详细解释插入排序的原理和步骤,并提供Java语言的实现示例。 插入排序的…...
OpenGL之光照贴图
我们需要拓展之前的系统,引入漫反射和镜面光贴图(Map)。这允许我们对物体的漫反射分量和镜面光分量有着更精确的控制。 漫反射贴图 我们希望通过某种方式对物体的每个片段单独设置漫反射颜色。我们仅仅是对同样的原理使用了不同的名字:其实都是使用一张覆盖物体的图像,让我…...
隐私交易成新刚需,Unijoin 凭什么优势杀出重围?
随着区块链技术的普及和发展,全球加密货币用户在持续增长,根据火币研究院公布的数据,2022年全球加密用户已达到 3.2亿人,目前全球人口总数超过了 80亿,加密货币用户渗透率已达到了 4%。 尤其是在 2020 年开启的 DeFi 牛…...
小谈设计模式(12)—迪米特法则
小谈设计模式(12)—迪米特法则 专栏介绍专栏地址专栏介绍 迪米特法则核心思想这里的“朋友”指当前对象本身以参数形式传入当前对象的对象当前对象的成员变量直接引用的对象目标 Java程序实现程序分析 总结 专栏介绍 专栏地址 link 专栏介绍 主要对目…...
Foxit PDF
Foxit PDF 福昕PDF 软件,可以很好的编辑PDF文档。 调整PDF页面大小 PDF文档中,一个页面大,一个页面小 面对这种情况,打开Foxit PDF 右键单击需要调整的页面,然后选择"调整页面大小". 可以选择…...
《Python趣味工具》——ppt的操作(刷题版)
前面我们对PPT进行了一定的操作,并将其中的文字提取到了word文档中。现在就让我们来刷几道题巩固巩固吧! 文章目录 1. 查看PPT(上)2. 查看PPT(中)3. 查看PPT(下)4. PPT的页码5. 大学…...
实战型开发--3/3,clean code
编程的纯粹 hmmm,一开始在这个环节想聊一些具体的点,其实也就是《clean code》这本书中的点,但这个就还是更流于表面; 因为编码的过程,就更接近于运动员打球,艺术家绘画,棋手下棋的过程&#x…...
家用无线路由器如何用网线桥接解决有些房间无线信号覆盖不好的问题(低成本)
环境 光猫ZXHN F677V9 水星MW325R 无线百兆路由器 100M宽带,2.4G无线网络 苹果手机 安卓平板电脑 三室一厅94平 问题描述 家用无线路由器如何用网线桥接解决有些房间无线信号不好问题低成本解决,无线覆盖和漫游 主路由器用的运营商的光猫自带无…...
【Golang】网络编程
网络编程 网络模型介绍 OSI七层网络模型 在软件开发中我们使用最多的是上图中将互联网划分为五个分层的模型: 物理层数据链路层网络层传输层应用层 物理层 我们的电脑要与外界互联网通信,需要先把电脑连接网络,我们可以用双绞线、光纤、…...
南京政府门户网站建设问题/百度推广客服
DedeCMS的HTML更新 为了减轻网站负载,提高搜索引擎的友好度,DedeCMS大多数内容都需要生成HTML,一般的操作如下: (推荐学习:dedecms教程) 1、发布内容(发布时会直接生成文档的HTML) 2、更新内…...
电商网站开发主要技术问题/关于友情链接说法正确的是
当我们想要对应用程序的请求或响应进行一些预处理/后处理时,使用截取过滤器设计模式。 在将请求传递到实际目标应用程序之前,在请求上定义和应用过滤器。 过滤器可以进行请求的认证/授权/日志记录或跟踪,然后将请求传递给相应的处理程序。 以…...
wordpress换服务器/深圳关键词排名seo
看完了“李智慧”老师的《大型网站技术架构-核心原理与案例分析》一书,让我对大型网站的技术架构所遇到的问题,所考虑到的内容,所用到的解决方案有了一个初步的认识。任何的大型网站都是从一个小网站发展而来的,淘宝、京东都不例外…...
深圳网站开发antnw/南宁seo服务优化
首先来看,jquery里自带的,和json相关的函数:1.$.parseJSON : 用来解析JSON字符串,返回一个对象。什么叫“JSON字符串”?比如:var a{name:"aijquery",url:"www.aijquery.cn"};上面定义的变量a是…...
网站建设速成/哪些平台可以打小广告
{"moduleinfo":{"card_count":[{"count_phone":1,"count":1}],"search_count":[{"count_phone":4,"count":4}]},"card":[{"des":"阿里技术人对外发布原创技术内容的最大平台&…...
模板做网站多少钱/巨量算数数据分析
十年之前的1月20日,Michel Schinz宣布了Scala编程语言的第一个实现。在宣布之时,Scala被描述为“一种平滑地集成了面向对象编程和函数式编程的语言”,而且“是为以简洁、优雅且类型安全的方式表达常见编程模式而设计的”。\u0026#xD;当时是这…...