当前位置: 首页 > news >正文

Linux学习之高级IO

之前的内容我们基本掌握了基础IO,如套接字,文件描述符,重定向,缓冲区等知识都是文的基本认识,而高级IO则是指更加高效的IO。

对于应用层,在读写的时候,本质就是把数据写给OS,若一方为空,就会进行阻塞式等待,读写的本质也就是数据的拷贝。

所以高效的 IO就需要在多线程的情况下,单位时间的的拷贝的数据更多,等待的比重越小。

目录

五种IO模型

非阻塞式IO

多路复用(多路转接)

IO多路转接之select

初识select

 select函数原型

IO多路转接之epoll

1.认识有关epoll的接口

2.epoll原理

3.编写epoll

4.epoll的工作模式

注意:

读写的改进 reactor


五种IO模型

 当前有五种IO模型:

我们以钓鱼佬钓鱼为例:(鱼竿:文件描述符) (上钩:读写就绪)(鱼:数据)

1.钓鱼佬张三,在完成打窝,之后选择好地点就开始钓鱼,在等待的过程中,雷打不动,任何人都干扰不到他,他死死的盯着鱼漂,一有动静,钩被咬了就提竿上鱼。这种等待的方式我们称为阻塞式IO,这也是大部分常用的读写接口的方式

2.钓鱼佬李四是一个资深的钓鱼佬,在完成准备工作时,李四就先观察了鱼漂,发现没动静,就拿起了手机刷视频,看一会儿之后再检查鱼漂发现上钩了,于是提杆上鱼。李四每隔一段时间进行检查,如果满足条件就读写,反之就干自己的事。这种方式称为非阻塞式IO--非阻塞轮询

3.钓鱼佬王五,拥有较为先进的鱼竿,完成准备工作后,直接就把鱼竿插入地上,一心一意的干其它事,鱼竿上有报警装置,一旦上鱼,立刻发送警报告知王五,所以王五只需要看有没有报警信号。这种方式被称为信号驱动式IO

4.赵六,身家万贯的热爱钓鱼的钓鱼佬,秉持着杆多鱼多的准则,直接准备了一卡车鱼竿,依次完成准备工作,入水后准备开钓,由于鱼竿数量多,赵六在岸上来回检查看哪只鱼竿上货了。这种方式称为多路复用(多路转接)。

5.田七,以吃鱼为爱好的世界五百强上市公司CEO,司机小王拉着田七来钓鱼,刚准备钓鱼,由于公司业务繁忙,对钓鱼不是很感冒,田七又回到了公司处理,且告知小王车上装备齐全,你在这里钓鱼,掉满了之后再给我打电话,我来接你。对于田七来说,这种方式称为异步IO

首先我们来看看阻塞式与非阻塞式 IO:

两者在效率上基本一样(有人觉得非阻塞式效率会高一点,这指的是非阻塞可以去干别的事,但是对于IO效率(等+拷贝)是一样的),区别是等的方式不同,一个一直等,一个间隔等一会儿。

再看同步与异步:

同步:参与等或者参数与了拷贝,都可以是同步IO。异步:两者都没参与,只拿数据。

那么综上:哪一种效率更高呢?

实际上就是田七,多路复用IO。我们学习的重点也就是多路复用.

非阻塞式IO

fcntl
一个文件描述符 , 默认都是阻塞 IO,通过函数fcntl可以修改文件描述符的属性。
函数原型如下
#include <unistd.h>
#include <fcntl.h>
int fcntl(int fd, int cmd, ... /* arg */ );
传入的 cmd 的值不同 , 后面追加的参数也不相同 .
fcntl 函数有 5 种功能 :
复制一个现有的描述符( cmd=F_DUPFD .
获得 / 设置文件描述符标记 (cmd=F_GETFD F_SETFD).
获得 / 设置文件状态标记 (cmd=F_GETFL F_SETFL).
获得 / 设置异步 I/O 所有权 (cmd=F_GETOWN F_SETOWN).
获得 / 设置记录锁 (cmd=F_GETLK,F_SETLK F_SETLKW).
我们此处只是用第三种功能 , 获取 / 设置文件状态标记 , 就可以将一个文件描述符设置为非阻塞
实现函数 SetNoBlock
基于 fcntl, 我们实现一个 SetNoBlock 函数 , 将文件描述符设置为非阻塞 .
void SetNoBlock(int fd) { int fl = fcntl(fd, F_GETFL); if (fl < 0) { perror("fcntl");return; }fcntl(fd, F_SETFL, fl | O_NONBLOCK); 
}
使用 F_GETFL 将当前的文件描述符的属性取出来 ( 这是一个位图 ).
然后再使用 F_SETFL 将文件描述符设置回去 . 设置回去的同时 , 加上一个 O_NONBLOCK 参数

了解到这两个主要的接口,我们就可以简单的实现一下非阻塞:

#include<iostream>
#include<string.h>
#include<unistd.h>
#include<fcntl.h>
#include<cstdio>
using namespace std;//默认阻塞,我们设置为非阻塞
void setNonBlock(int fd)
{int f1=fcntl(fd,F_GETFL);if(f1<0){cerr<<"get failed"<<endl;}//设置标记位,实际上是在原基础上新增一个标记位fcntl(fd,F_SETFL,f1 | O_NONBLOCK);//以非阻塞式打开cout<<"set O_NONBLOCK succed"<<endl;
}int main()
{char buffer[1024];//缓冲区while(true){printf("please enter#");fflush(stdout);setNonBlock(0);sleep(1);ssize_t size=read(0,buffer,sizeof(buffer)-1);//从标准输入中读取if(size>0){buffer[size-1]=0;cout<<"读取到数据 :"<<buffer<<endl;}else if(size==0){cout<<"read done"<<endl;break;}else{cerr<<errno<<":"<<strerror(errno)<<endl;}}return 0;
}

一般阻塞式,系统就会等待着我们输入数据后,才进行下一步工作(这里是打印输入的字符)。

我们设置为非阻塞式有几点需要注意:

1.设置为非阻塞,系统不会等待着我们输出,才打印,而是以读取到数据<0打印错误信息。

2,出错非两种情况:第一个就是fd真的异常 ,第二个就是底层还没就绪,你就返回了。(这里是第二种)。

3.非阻塞也能获取到数据并打印,不过给你就绪的时间非常短。

对于第二点,如何区分是还没就绪还是fd异常,可以通过打印错误码而知晓。

多路复用(多路转接)

实际上对于阻塞式,非阻塞式,非阻塞式轮询这些等待和拷贝一个函数就会搞定了,但是对于多路转接,因为要提高IO效率,因此等待与拷贝是分开的的,多路复用的本质就是非阻塞轮询,因此我们需要将非阻塞和轮询逐一实现。

IO多路转接之select

初识select

系统提供 select 函数来实现多路复用输入 / 输出模型 .
select 系统调用是用来让我们的程序监视多个文件描述符的状态变化的 ;
程序会停在 select 这里等待,直到被监视的文件描述符有一个或多个发生了状态改变

 select函数原型

 select的函数原型如下: #include <sys/select.h>

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

其中有三个参数类型为fd_set,位图类型,为输入输出型参数。

为输入型参数时:用户告诉内核,我给一些fd,你帮我注意观察一些fd的读事件,如果读事件就绪,你就告诉我

为输出型参数时:内核告诉用户,你给我传来的fd,我已经帮你知晓了有哪些已经就绪(进行位图修改),你可以读了。

至于为什么用位图,因为位图本质就是设置某个数的第几位为0/1,举例:如果此时内核发现0号和1号文件被描述符已经就绪,就会把第0位和第1位比特位置为1, (比特位的位置代表对应文件描述符,从右向左)。比特位的内容就可以表示内核是否要关心。

作为输入型位图:

比特位的位置代表文件描述符

比特位的内容代表是否内核需要关心

 作为输出型位图:

比特位的位置还是文件描述符

比特位的内容代表用户关心的fd的读事件就绪了

于是就提供了一系列位图的接口用来修改为位图。

  其中timeout为一个结构体,里面有两个成员:一个代表秒,一个代表微秒。、

如果设置了timeout,在该时间内如果有已经就绪的,立即返回,之后的timeout表示剩下的时间。

如果没设置,只要有一个就绪了,就返回。

select的返回值:

  1. 返回值

    • 正常情况下,返回已准备好的文件描述符的个数。
    • 经过超时时长后仍无设备准备好,返回值为 0。
    • 如果 select 被某个信号中断,返回 -1 并设置 errno 为 EINTR
    • 如果出错,返回 -1 并设置相应的 errno

对于服务端来说,使用多路复用可以提高IO效率,当没有用访问服务端使,此时select的个数为零,但一有客户端访问,就会有对应的文件描述符(接收缓冲区)被创建,并且添加进行状态检测。

select.hpp

#include "Socket.hpp"
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <iostream>const int max_fd = (sizeof(fd_set)) * 8; // 8个位图最多存储的bit位
static const uint32_t defaultport = 8080;
class SelectServer
{
public:SelectServer(const uint32_t Port = defaultport) : port(Port){// 对辅助数组初始化为-1for (int i = 0; i < 1024; i++){fd_arry[i] = defaultfd;}}~SelectServer(){_listenfd.Close();}bool Init(){_listenfd.Createsockfd();_listenfd.Bind(port);_listenfd.Listen();return true;}void Accept(){// 连接就绪,获取新连接可以通信了std::string clientip;uint16_t clientport;int sockfd = _listenfd.Accept(&clientip, &clientport); // 就绪了,不在阻塞if (sockfd < 0)return;std::cout << "获取连接成功"<< "描述符:" << sockfd << std::endl;// 获取之后不能进行读取,而是设置select,不然还是会阻塞在select中int pos = 1;for (; pos < max_fd; pos++){if (fd_arry[pos] != defaultfd){continue;}else{break;}}if (pos == max_fd){std::cout << "can not handle" << std::endl;close(sockfd); // 无法处理就关掉接收的文件描述符}else{// 找到了空闲位置,此时放到fd_arry里面管理fd_arry[pos] = sockfd;}}void recver(int fd,int i){// 不在位图中代表的是接收后新的fdchar buffer[1024];ssize_t n = read(fd, buffer, sizeof(buffer) - 1);if (n > 0){std::cout << "read meesage:" << buffer << std::endl;}else if(n==0){std::cout<<"client closed" << std::endl;//如果为空,说明要关闭连接了close(fd);fd_arry[i]=defaultfd;}else{std::cout<<"read warning"<<std::endl;close(fd);fd_arry[i]=defaultfd;}}void Disapacher(fd_set fds){// 为了确定就绪的文件描述符。需要遍历整个数组for (int i = 0; i < max_fd; i++){int fd = fd_arry[i];if (fd == defaultfd){continue;}// 先判断文件描述符是否在位图中if (fd == _listenfd.Fd() && FD_ISSET(fd, &fds)) // 判断是否是位图中的,是就代表就绪{Accept();}else{recver(fd,i);}}}void Start(){int listensock = _listenfd.Fd(); // 获取套接字的文件描述符fd_arry[0] = listensock;fd_set rfds;                     // 读的文件描述符位图FD_ZERO(&rfds);                  // 先进行清空struct timeval timeout = {10, 0}; // 设置时间戳,每隔五秒检验一次int fdmax = fd_arry[0];while (true){for (int i = 0; i < max_fd; i++){if (fd_arry[i] == defaultfd){continue;}else{// 如果不为-1,说明该位置有要检测的文件描述符FD_SET(fd_arry[i], &rfds); // 将指定的文件描述符添加进来(交给select进行检测)// 同时找出最大的文件描述符if (fd_arry[i] > fdmax){fdmax = fd_arry[i];std::cout << "max has updated:" << fdmax << std::endl;}}}// 这里不能直接进行accept,accept的本质就是检测并获取listenfd上面的事件,accept只能阻塞等一个// 因为我们要实现多路复用,所以这里是要去等待多个套接字并检测他们的状态,使用selectint n = select(fdmax + 1, &rfds, nullptr, nullptr, &timeout); // 通过内核的select帮我进行检测,刚开始启动只有一个listenfdswitch (n){case 0:std::cout << "time out" << std::endl;break;case -1:std::cout << "select error" << std::endl;default:// 有链接了,如果对链接不处理,select会一直通知你,需要处理并设置位图// 新连接到来我们认为是读事件,Disapacher(rfds);//名为事件派发器break;}}}private:Sock _listenfd;uint32_t port;int fd_arry[max_fd]; // 辅助数组,统计所有描述符
};

 main.cc

#include"SelectSever.hpp"
#include<memory>
#include<iostream>void usehelp(char*s)
{printf("use correct code:%s port[1024]\n",s);
}
int main(int argc,char*argv[])
{std::unique_ptr<SelectServer>  server(new(SelectServer));server->Init();server->Start();return 0;
}

编写select的思路:

1.首先创建,绑定,监听套接字,之后创建文件描述符数组对所有文件描述符管理。

2.插入最开始的listenfd到数组里,遍历数组找出最大的文件描述符,设置位图用来标识就绪的文件描述符,之后select数组最大fd+1文。

2.select之后判断是否有新链接,有新的连接(这里我们默认是读事件)就会有新的文件描述符,对事件进行处理。

3.处理时,数组里的文件描述符分两种,一种使位图中的就绪的fd,一种是就绪后进行accept的新的fd,因此对整个数组进行判断,是位图中的,就行新接受新连接,是accept的进行读事件,反之是-1,continue。

但是select是有很明显的缺陷:

1.等待的fd是有上限的--1024

2.输入输出参数较多,数据拷贝频率高

3.输入输出参数多,每一次都对需要关心的fd重置以达到复用

4.管理数组fd,有太多次进行遍历

为了解决种种情况,我们使用了下一种接口(poll):

poll 函数接口
说明
fds 是一个 poll 函数监听的结构列表 . 每一个元素中 , 包含了三部分内容 : 文件描述符 , 监听的事件集合 , 返 回的事件集合.
nfds 表示 fds 数组的长度 .
timeout 表示 poll 函数的超时时间 , 单位是毫秒 (ms).
返回结果
返回值小于 0, 表示出错 ;
返回值等于 0, 表示 poll 函数等待超时 ;
返回值大于 0, 表示 poll 由于监听的文件描述符就绪而返回

这里我们介绍一下,重点放在select_poll上。

IO多路转接之epoll

epoll的说明是:为了处理大批量句柄而改进的poll,

1.认识有关epoll的接口

epoll_create 创建一个epoll模型

 epoll_wait  等待多长时间进行一次事件检查 返回就绪的fd与event

 返回值位已经准备就绪的个数。对于event的宏表示的就绪时间,类型如下:

EPOLLIN : 表示对应的文件描述符可以读 ( 包括对端 SOCKET 正常关闭 );
EPOLLOUT : 表示对应的文件描述符可以写 ;
EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 ( 这里应该表示有带外数据到来 );
EPOLLERR : 表示对应的文件描述符发生错误 ;
EPOLLHUP : 表示对应的文件描述符被挂断 ;
EPOLLET : EPOLL 设为边缘触发 (Edge Triggered) 模式 , 这是相对于水平触发 (Level Triggered) 来说的 .
EPOLLONESHOT :只监听一次事件 , 当监听完这次事件之后 , 如果还需要继续监听这个 socket 的话 , 需要
再次把这个 socket 加入到 EPOLL 队列里 .

epoll_ctl  作用是对系统的文件描述符的事件增加,删除,修改。即对文件描述符做管理

 第一个参数就是epoll_create的返回值,op代表三个选项(增加,修改,删除),第三个代表那个fd对应的哪个事件。

对于epoll的使用时要使用这三个系统接口的。

2.epoll原理

当某一进程调用 epoll_create 方法时, Linux 内核会创建一个 eventpoll 结构体,这个结构体中有两个成 员与epoll 的使用方式密切相关
每一个 epoll 对象都有一个独立的 eventpoll 结构体,用于存放通过 epoll_ctl 方法向 epoll 对象中添加进来 的事件.
这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来 ( 红黑树的插 入时间效率是lgn ,其中 n 为树的高度 ).
而所有添加到 epoll 中的事件都会与设备 ( 网卡 ) 驱动程序建立回调关系,也就是说,当响应的事件发生时 会调用这个回调方法.
这个回调方法在内核中叫 ep_poll_callback, 它会将发生的事件添加到 rdlist 双链表中 .
epoll 中,对于每一个事件,都会建立一个 epitem结构体

实际上epoll和poll是两个不同的模型,没太多关系,epoll是专门为用户设计的一个底层事件调度的模型。

3.编写epoll

对于select来说,我们使用一个系统调用接口加上文件描述符数组来进行非阻塞式的等待。

那么对于epoll来说就是通过三个系统调用接口,1.创建模型,3.进行非阻塞式等待(时间自己设置),2.使用epoll_ctl管理事件(本质上时使用红黑树的节点绑定事件进行高效的管理)。

每一个事件和她的描述符都被封装在一个结构体epoll_event当中,之后通过一个数组管理这里epol_event。

第一个不太完美的版本的eopp如下:

epoler.hpp

//封装epoll接口
#pragma once
#include <sys/epoll.h>
#include <unistd.h>
#include<iostream>
#include<string.h>
#include"Nocopy.hpp"uint32_t EVENT_IN=(EPOLLIN); //读事件
uint32_t EVENT_OUT=(EPOLLOUT);//写事件//因为我们不想该类被拷贝,所以继承不被拷贝的类
class Epoll :public nocopy
{public:static const int size=128;Epoll():_timeout(3000)     //设置检测时间3s一次,若为零就是非阻塞等待{//创建epoll模型_epfd=epoll_create(size);if(_epfd==-1){std::cout<<"errno: "<<errno<<"error message:"<<strerror(errno)<<std::endl;}else{std::cout<<"create succed ,epfd:"<<_epfd<<std::endl;}}//使用epoll接口进行等待int Epollwait(struct epoll_event events[],int num) //输入参数events表示就序的事件 num表示个数{int ret=epoll_wait(_epfd,events,num,_timeout);//就绪了就从这里拿}int Epollctl(int oper,int sock,uint32_t event){//向指定模型根据oper添加修改删除事件if(oper==EPOLL_CTL_DEL){//这里对删除事件进行单独处理int ret=epoll_ctl(_epfd,oper,sock,nullptr);}else{//注册事件struct epoll_event ev; //创建事件结构体并设置ev.events=event;//设置事件ev.data.fd=sock;//设置fdint ret=epoll_ctl(_epfd,oper,sock,&ev);//进行什么操作呢  本质就是向红黑树新增节点,节点绑定事件if(ret==-1){std::cout<<"errno: "<<errno<<"error message:"<<strerror(errno)<<std::endl;}else{std::cout<<"add succed ,epfd:"<<_epfd<<std::endl;}return ret;}}~Epoll(){if(_epfd==-1){close(_epfd);std::cout<<"close succed ,epfd:"<<_epfd<<std::endl;}}private:int _epfd;int _timeout; 
};

epollserver.hpp

#pragma once
#include"Socket.hpp"
#include"Epoller.hpp"
#include"Nocopy.hpp"
#include<iostream>
#include<memory>
#include<sys/epoll.h>//设置你要关心的时事件const uint16_t defalutport=8080;
class EpollServer :public nocopy
{public:static const int num=64;//每次从就绪队列中取64个就绪事件EpollServer(uint16_t port=defalutport):_port(port),_listensock(new Sock),_epoller(new Epoll){}bool Init(){_listensock->Createsockfd();_listensock->Bind(_port);_listensock->Listen();return true;}void Start(){//将listensock添加到epoll中检测你关心的读写事件,这里就是添加_epoller->Epollctl(EPOLL_CTL_ADD,_listensock->Fd(),EVENT_IN);struct epoll_event evns[num];//num代表最多就绪的个数,若果等待的个数太多,就会分配批次取waitwhile(true){int n=_epoller->Epollwait(evns,num); //返回值代表就绪的个数if(n>0){//在事件插入之后,有事件就绪了std::cout<<"检测对应的fd的事件,fd:"<<evns[0].data.fd<<std::endl; //既然就绪,那就开始处理事件HandlerEvent(evns,n);}else if(n==0){std::cout<<"time out"<<std::endl;}else{std::cout<<"wait error"<<std::endl;}}}void Accepter(){std::string clientip;uint16_t clientport;int sock =_listensock->Accept(&clientip,&clientport);if(sock>0){//获取到新连接再插入到红黑树当中_epoller->Epollctl(EPOLL_CTL_ADD,sock,EVENT_IN);}else{std::cout<<"accept error"<<std::endl;}}void Dispatcher(int fd){char buffer[1024];// 已经获取新连接,开始readint n = read(fd, buffer, sizeof(buffer) - 1);if (n > 0){buffer[n] = 0;std::cout << "读取到数据:" << buffer << std::endl;//读完之后写回给客户端std::string echo_string="server get message";std::string content=echo_string+std::string(buffer);write(fd,content.c_str(),content.size());}else if(n==0){std::cout << "client close:"<< std::endl;//删除对应的文件描述符 _epoller->Epollctl(EPOLL_CTL_DEL,fd,0); //从红黑树中拿出去,再关闭fdclose(fd);}else{std::cout << "read error!!" << std::endl;_epoller->Epollctl(EPOLL_CTL_DEL,fd,0); //从红黑树中拿出去,再关闭fdclose(fd);}}//这里的事件假设就两种,读 写void HandlerEvent( struct epoll_event evns[],int num){for(int i=0;i<num;i++){uint32_t event=evns[i].events;int fd=evns[i].data.fd; //判断是哪一种事件if(event&EVENT_IN){//判断fd是等待的fd,还是已经获取新连接的fdif(fd==_listensock->Fd()){Accepter();}else{Dispatcher(fd);}}else if(event & EVENT_OUT){//写事件就绪}}}~EpollServer(){_listensock->Close();}private:std::shared_ptr<Sock> _listensock;std::shared_ptr<Epoll> _epoller;uint16_t _port;};

4.epoll的工作模式

epoll的工作模式有利各种:ET 和LT

LT(level triggered)工作模式--水平工作模式

epoll 默认状态下就是 LT 工作模式 .
1.当 epoll 检测到 socket 上事件就绪的时候 , 可以不立刻进行处理 . 或者只处理一部分 .
如上面的例子 , 由于只读了 1K 数据 , 缓冲区中还剩 1K 数据 , 在第二次调用 epoll_wait , epoll_wait 仍然会立刻返回并通知socket 读事件就绪 .
直到缓冲区上所有的数据都被处理完 , epoll_wait 才不 会立刻返回 .
2.支持阻塞读写和非阻塞读写

 通俗点说,如果上层有新的数据,你不拿走,上层就一直通知你让你取数据,只要有就拿走。

ET(Edge Triggered)工作模式  --边缘工作模式

如果我们在第1步将socket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式.

epoll 检测到 socket 上事件就绪时 , 必须立刻处理 .
如上面的例子 , 虽然只读了 1K 的数据 , 缓冲区还剩 1K 的数据 , 在第二次调用 epoll_wait 的时候 ,
epoll_wait 不会再返回了 .
也就是说 , ET 模式下 , 文件描述符上的事件就绪后 , 只有一次处理机会 .
ET 的性能比 LT 性能更高 ( epoll_wait 返回的次数少了很多 ). Nginx 默认采用 ET 模式使用 epoll.
只支持非阻塞的读写

通俗点说,只有发生变化(从没链接到有新连接,从一个变多个,从多个变没),就会进行通知取走数据,只有变化,才会拿走。

由于通知的次数少,且通知是有限的,所以一般而言ET的工作效率更高一些,但也因为之中特性,程序员必须每次把数据都取完(循环读取,直到读取出错),否则就会遗漏。

但是读完数据,我们此时的read是阻塞的,我们不能让她阻塞,这会导致服务器挂起(这是不好的),所以我们需要设置读是非阻塞的。直到非阻塞状态下还读完了,就会返回错误码。

且由于我们把缓冲区的数据都把走了,这一位则TCP通信时,窗口就更大了,一次拿去的数据也多了。

注意:

即使如此,一般情况ET比LT高效,但是你是一直比LT高效吗,而且既然你ET都能一次性非阻塞的读取完,我LT不行吗?只是我们一般去使用ET,如果可以,你也可以去使用LT修改。

读写的改进 reactor

所以我们的读写是不合理的,首先没有协议的定制,其次读取并不是非阻塞的。

这里就还是以编写计算器的客户端与服务端为例:

重要的读写部分在TcpServer.hpp中:

TcpServer.hpp

#pragma once
#include<string>
#include<iostream>
#include<memory>
#include<functional>
#include<unordered_map>
#include <arpa/inet.h>
#include"Nocopy.hpp"
#include"Socket.hpp"
#include"Epoller.hpp"
#include"Comen.hpp"
#include"Protocol.hpp"
#include"Calculator.hpp"class Connection;
class TcpServer;
using funct=std::function<void (std::shared_ptr<Connection>)>; //定义了返回值为void,参数为td::shared_ptr<Connection>指针的包装器
const static int buffersize=1024;
const uint16_t defaultport=8080;                                                                                                    
const int num=64; 
class Connection
{public:void SetHandle(funct &recv_cb,funct &send_cb,funct &except_cb){_recver_callback=recv_cb;_send_callback=send_cb;_except_callback=except_cb;}void Appendinbuffer(std::string buffer){_inbuffer+=buffer;//输入缓冲区}void Appendoubuffer(std::string buffer){_outbuffer+=buffer;}const std::string &inbuffer(){return _inbuffer;}Connection(int sock,std::shared_ptr<TcpServer> tcpserver_ptr):_sock(sock),_tcpserver_callback(tcpserver_ptr)//回值指针用来传递TCPserver{}int getsock(){return _sock;}std::string& getinbuf(){return _inbuffer;}std::string& getoutbuf(){return _outbuffer;}~Connection(){}private:int _sock;private:std::string _inbuffer;//输入缓冲区std::string _outbuffer;//输出缓冲区public:funct _recver_callback; //接收回调funct _send_callback;   //发送回调funct _except_callback; //异常回调//tcpserver回值指针std::shared_ptr<TcpServer> _tcpserver_callback;
};class TcpServer
{public:TcpServer(funct OnMessage,uint16_t port=defaultport):_epoll_server(new Epoll),_listensocket(new Sock),_port(port),_quit(true)//回调函数用来设置处理事件{_OnMessage=OnMessage;_listensocket->Createsockfd();_listensocket->Bind(_port);_listensocket->Listen();SetNonBlock(_listensocket->Fd()); //将文件设置为非阻塞等待AddConnection(_listensocket->Fd(),EVENT_IN,std::bind(&TcpServer::Accepter,this,std::placeholders ::_1),nullptr,nullptr);}void Init(){}void Accepter(std::shared_ptr<Connection> connection){//连接到来开始接受while(true){struct sockaddr_in peer;socklen_t len=sizeof(peer);int fd=::accept(connection->getsock(),(sockaddr*)&peer,&len); //获取新链接if(fd>0){uint16_t clientport=ntohs(peer.sin_port);char ipbuffer[10];inet_ntop(AF_INET,&peer.sin_addr,ipbuffer,sizeof(ipbuffer));std::string ip=std::string(ipbuffer);std::cout<<"get new link,ip:"<<ip<<",fd :"<<fd<<",port"<<clientport<<std::endl;//获取的新连接设置非阻塞,现在是ET模式SetNonBlock(fd);//之后再添加到哈希表中,epoll_event中,AddConnection(fd,EVENT_IN,\std::bind(&TcpServer::Recver,this,std::placeholders::_1),   //根据函数地址与参数bind成一个包装器对象std::bind(&TcpServer::Sender,this,std::placeholders::_1),\std::bind(&TcpServer::Exepter,this,std::placeholders::_1));}else{//直到非阻塞被读到没有新的连接了if(errno==EWOULDBLOCK){break;}else if(errno==EINTR){continue;}else{//读出错break;}}}}void AddConnection(int sock,uint32_t event,funct recv_cb,funct send_cb,funct except_cb){// 将listensock添加到epoll中检测你关心的读写事件,这里就是添加 并且设置为边缘工作模式_epoll_server->Epollctl(EPOLL_CTL_ADD, sock, event);//c除此之外,每一个fd都要构建成Connetcion对象,通过connect管理事件的处理std::shared_ptr<Connection> new_connection=std::make_shared<Connection>(sock,std::shared_ptr<TcpServer>(this));new_connection->SetHandle(recv_cb,send_cb,except_cb);//第三步,就是添加到unordered_map_connection_map[sock]=new_connection;}//事件管理,进行事件的处理void Recver(std::shared_ptr<Connection> connection){int sock=connection->getsock();//ET模式读while(true){char buffer[buffersize];memset(buffer,0,sizeof(buffer));ssize_t n=recv(sock,buffer,sizeof(buffer)-1,0); //这里的数据默认为字符流,如果还有二进制,需要换为vector if(n>0){//读取成功connection->Appendinbuffer(buffer);}else if(n==0){//连接关闭std::cout<<"link closed !"<<std::endl;connection->_except_callback(connection);}else{//读取错误if(errno==EWOULDBLOCK){break;}else if(errno==EINTR){continue;}else{std::cout<<"read error!"<<std::endl;connection->_except_callback(connection);break;}}}//读完之后,将数据交给上层_OnMessage(connection); //协议的定制}void Sender(std::shared_ptr<Connection> connection){while(true){std::string &buffer=connection->getoutbuf();int sockfd=connection->getsock();ssize_t n=send(sockfd,buffer.c_str(),buffer.size(),0);if(n>0){//清空缓冲区buffer.erase(0,n);if(buffer.empty())break;}else if(n==0){return ;}else {if(errno==EWOULDBLOCK){break;}else if(errno==EINTR){continue;//信号中断,跳过本次}else {//发失败了,进入异常处理std::cout<<"send error"<<std::endl;connection->_except_callback(connection);return;}}if(!buffer.empty()){//对写事件的关心EnableEvent(connection->getsock(),true,true);}else{//关闭对写事件的处理EnableEvent(connection->getsock(),true,false);}}}void EnableEvent(int fd,bool readable,bool writeable ){uint32_t event=0;event |=((readable ? EPOLLIN:0)|(writeable ? EPOLLOUT : 0)|EPOLLET);_epoll_server->Epollctl(EPOLL_CTL_MOD,fd,event);//修改事件的读写,如果没发完,就继续发,通过修改事件为写}void Exepter(std::shared_ptr<Connection> connection){//处理链接异常或着断开std::cout<<"事件异常,断开连接!"<<std::endl;//首先移除链接//EnableEvent(connection->getsock(),false,false);//读写都设置为False_epoll_server->Epollctl(EPOLL_CTL_DEL,connection->getsock(),0);//移出事件 //关闭异常的文件描述符close(connection->getsock());std::cout<<"已移除connection,已关闭文件描述符:"<<connection->getsock()<<std::endl;//把map中的也删了}bool isConnectionsafe(int fd){auto it=_connection_map.find(fd);if(it==_connection_map.end()){return false;}return true;}void Disptcher(int timeout){//进行等待就绪时间int n=_epoll_server->Epollwait(events,num,timeout);for(int i=0;i<n;i++){uint32_t event=events[i].events;int sock=events[i].data.fd;//进行事件判断,并转化成读写事件if((event & EVENT_IN)&& isConnectionsafe(sock)){//读就绪if(_connection_map[sock]->_recver_callback)//函数存在{_connection_map[sock]->_recver_callback(_connection_map[sock]);//fd对应的connetcion信息,其中调用对应的方法,参数为connection对象}}if((event & EPOLLOUT)&& isConnectionsafe(sock)){//写就绪if(_connection_map[sock]->_recver_callback){_connection_map[sock]->_send_callback(_connection_map[sock]);}}if(event & EPOLLHUP){//读关闭event |=(EPOLLIN & EPOLLOUT);}if(event &EPOLLERR){//错误event |=(EVENT_IN & EVENT_OUT);}}}void Loop(){_quit=false;while (!_quit){Disptcher(3000);//3s}_quit=true;}~TcpServer(){}private:std::shared_ptr<Epoll> _epoll_server;  //epoll接口std::unordered_map<int,std::shared_ptr<Connection>> _connection_map;  //连接池std::shared_ptr<Sock> _listensocket; //套接字接口struct epoll_event events[num];uint32_t _port;bool _quit; //是否退出funct _OnMessage;//上层回调处理信息}; 

剩余代码的所在地:reactor · 但成伟/编程学习 - 码云 - 开源中国 (gitee.com)

主要难点是对事件与链接的管理,事件的执行,读写非阻塞这几点。

相关文章:

Linux学习之高级IO

之前的内容我们基本掌握了基础IO&#xff0c;如套接字&#xff0c;文件描述符&#xff0c;重定向&#xff0c;缓冲区等知识都是文的基本认识&#xff0c;而高级IO则是指更加高效的IO。 对于应用层&#xff0c;在读写的时候&#xff0c;本质就是把数据写给OS&#xff0c;若一方…...

一分钟了解Polysciences PEI 40K转染试剂的原理

在细胞实验中&#xff0c;细胞转染大概是最常用、最基础的实验技能。转染细胞的方法很多&#xff0c;而PEI作为带有高电荷阳离子的多聚物&#xff0c;非常容易结合带负电荷的DNA分子&#xff0c;形成复合物&#xff0c;在HEK293和CHO等细胞中转染效率较高&#xff0c;常用于大规…...

Clickhouse IP 函数

IPv4NumToString(num)​ 将数字类型ip转换为IPv4格式。 Takes a UInt32 number. Interprets it as an IPv4 address in big endian. Returns a string containing the corresponding IPv4 address in the format A.B.C.d (dot-separated numbers in decimal form). Alias: …...

【Python】numpy.ptp()

numpy.ptp() 函数是 NumPy 库中的一个有用函数&#xff0c;用于计算数组中的“峰到峰”&#xff08;peak-to-peak&#xff09;值&#xff0c;即数组中的最大值与最小值之差。这个函数可以帮助快速评估数组中数据的变化范围&#xff0c;常用于信号处理、数据分析等领域中&#x…...

The provided password or token is incorrect or your account

IDEA使用git技巧 【/n】 01 问题出现场景 我的gitlab上个月生成的token到期了,于是今天推上去的时候报了这个错误 The provided password or token is incorrect or your account has 2FA enabled and you must use a personal access token instead of a password. See ht…...

常见的shell命令

以下是一些常见的shell命令&#xff1a; cd&#xff1a;改变当前目录&#xff1b;ls&#xff1a;列出目录中的文件和子目录&#xff1b;mkdir&#xff1a;创建一个新的目录&#xff1b;touch&#xff1a;创建一个新的空文件或更新已存在的文件的时间戳&#xff1b;rm&#xff…...

堆栈打印跟踪Activity的启动过程(基于Android10.0.0-r41),framework修改,去除第三方app的倒计时页面

文章目录 堆栈打印跟踪Activity的启动过程(基于Android10.0.0-r41)&#xff0c;framework修改&#xff0c;去除第三方app的倒计时页面1.打印异常堆栈2.去除第三方app的倒计时页面3.模拟点击事件跳过首页进入主页 堆栈打印跟踪Activity的启动过程(基于Android10.0.0-r41)&#x…...

只允许内网访问时,如何设置hosts

1、Hosts文件简介 hosts文件是一个没有扩展名的计算机文件&#xff0c;用于将主机名与对应的 IP 地址关联起来。在操作系统中&#xff0c;hosts文件通常用于在本地解析域名&#xff0c;以便将域名映射到特定的IP地址。这个文件可以用来屏蔽广告、加速访问特定网站、解决DNS解析…...

nature《自然》期刊文献怎么在家查看下载

nature《自然》期刊我们都知道&#xff0c;是世界上历史悠久的、最有名望的科学杂志之一。下载该期刊文献是需要使用权限的&#xff0c;如果你没有nature《自然》期刊的资源&#xff0c;又该如何获取呢&#xff1f;请看本文的经验分享。 一、先百度“文献党下载器” 在文献党下…...

python作业五

题目&#xff1a;注册登录 制作一个注册登录模块 注册&#xff1a;将用户填入的账户和密码保存到一个文件(users.bin) 登陆&#xff1a;将用户填入账户密码和users.bin中保存的账户密码进行比对,如果账户和密码完全相同 那 么登录成功&#xff0c;否则登录失败…...

经典的设计模式和Python示例(一)

目录 一、工厂模式&#xff08;Factory Pattern&#xff09; 二、单例模式&#xff08;Singleton Pattern&#xff09; 三、观察者模式&#xff08;Observer Pattern&#xff09; 一、工厂模式&#xff08;Factory Pattern&#xff09; 工厂模式&#xff08;Factory Pattern…...

Ubuntu服务器如何安装桌面

更新软件库 apt-get update 升级软件 apt-get upgrade 安装ubuntu桌面系统 apt-get install ubuntu-desktop 运行过程需要手动确认两次&#xff0c;选择 Y。 安装完成之后&#xff0c;终端输入 reboot&#xff0c;重启服务器。...

填报表如何实现电话号码的校验

单元格校验时&#xff0c;只能输入数字和特定字符&#xff08;-&#xff09;&#xff0c;即实现固话和手机号码的校验&#xff0c;保证录入的规范&#xff0c;应如何实现&#xff1f; 解决方案&#xff1a;使用正则表达式实现校验效果&#xff0c;如下图所示&#xff1a; 校验…...

揭秘全网热门话题:抖音快速涨粉方法,巨量千川投流助你日增10000粉

在当今社交媒体的时代( 千川投流&#xff1a;hzzxar&#xff09;抖音成为了年轻人分享自己才华和生活的平台。然而&#xff0c;要在抖音上快速获得关注和粉丝&#xff0c;却不是一件容易的事情。今天&#xff0c;我们将揭秘全网都在搜索的抖音快速涨1000粉的秘籍&#xff0c;带…...

电脑提示‘找不到msvcr110dll,无法继续执行代码’的解决方法,3分钟快速修复

不知道大家有没有遇到过这种情况&#xff0c;无端端电脑提示你找不到msvcr110dll,无法继续执行代码&#xff1f;当出现这个情况&#xff0c;证明你的某个程序就已经运行不了&#xff0c;你需要去修复这个错误&#xff0c;才能正常的运行程序&#xff0c;下面我们一起来详细的了…...

如何在Hostease的Linux虚拟主机上永久移除WordPress网站

最近有遇到客户咨询如何移除Linux虚拟主机上的WordPress网站的&#xff0c; 因为原先的站点长时间不更新&#xff0c;被恶意篡改&#xff0c;跳转到了一个博彩网站上&#xff0c;本身网站也比较旧了&#xff0c;客户也不准备修复&#xff0c;准备重新建站。但是又怕移除不干净&…...

【云原生】Docker 的网络通信

Docker 的网络通信 1.Docker 容器网络通信的基本原理1.1 查看 Docker 容器网络1.2 宿主机与 Docker 容器建立网络通信的过程 2.使用命令查看 Docker 的网络配置信息3.Docker 的 4 种网络通信模式3.1 bridge 模式3.2 host 模式3.3 container 模式3.4 none 模式 4.容器间的通信4.…...

如何优雅的实现浏览器多标签通讯

前言 开发过程中无法避免遇到需要进行多标签通讯的情况&#xff0c;例如&#xff1a; 管理员登陆后&#xff0c;其他打开标签的页面登陆状态要变更课堂页面只能打开一个&#xff0c;另一个则通知失效等等。。。场景 然而实现该功能&#xff0c;我们需要使用页面能共同持有的…...

刷题之不相同的字符串(卡码网模拟)

卡码网不同的字符串 #include<vector> #include<string> #include<iostream> using namespace std; int main() {int n0;cin>>n;for(int i0;i<n;i){string s;cin>>s;vector<int>hash(26,0);for(int j 0;j < s.size();j)hash[s[j…...

JS-导入导出

export和export default是ES6中导出模块中变量的语法 导入导出变量 //导出方法&#xff08;js文件中&#xff09; export const 变量名值//导入方法 对应导入的变量&#xff0c;一定要加花括号 import {变量名} from js文件路径 导入导出函数 //导出方法&#xff08;js文件中…...

【代码随想录——数组篇】

代码随想录——数组篇 2. 二分查找3. 移除元素4. 有序数组的平方5. 长度最小的子数组6. 螺旋矩阵II 2. 二分查找 力扣题目链接 前提&#xff1a; 有序数组数组中无重复元素 代码&#xff1a; &#xff08;版本一&#xff09;左闭右闭区间 class Solution {public int sea…...

使用 js 类封装项目中音频播放功能的工具方法utils

在前端开发中&#xff0c;音频播放功能是一个常见的需求&#xff0c;我们经常需要在项目中加入音频播放、音频提示等功能。为了提高开发效率和代码复用性&#xff0c;我们可以封装一个工具方法来管理音频播放功能。 在本文中&#xff0c;我将介绍如何封装项目中音频播放功能的…...

【超详细】R语言贝叶斯方法在生态环境领域中的高阶技术应用

查看原文>>>R语言贝叶斯方法在生态环境领域中的高阶技术应用 目录 专题一&#xff1a;前期资料 专题二&#xff1a;R和Rstudio入门和绘图&#xff08;含ggplot&#xff09; 专题三&#xff1a;R语言数据清洗-tidyverse包应用 专题四&#xff1a;贝叶斯回归模型-回…...

Python 正则表达式 re . 符号

Python 正则表达式 re . 符号 正文示例1示例2 正文 用法说明&#xff1a;(点号) 在默认模式下&#xff0c;匹配除换行符以外的任意字符。 如果指定了 flags 参数 DOTALL &#xff0c;它将匹配包括换行符在内的任意字符。 示例1 import restr1 abcde print(re.search(., str…...

智慧监控 高效运维

随着企业IT建设的不断深入和完善&#xff0c;IT管理的重要性逐渐被重视&#xff0c;打通数据割裂&#xff0c;使业务更加充分融合。亟需一套统一的平台来实现跨品牌跨设备类型的集中监控和管理。 LinkSLA带外监控平台&#xff0c;不仅适用于大规模或超大规模的运维场景&#x…...

JAVA每日面试题(一)

Java面试问题及答案 1. 解释Java中的垃圾回收机制和如何优化它 问题&#xff1a; 在Java中&#xff0c;垃圾回收&#xff08;Garbage Collection, GC&#xff09;是如何工作的&#xff1f;作为一名Java开发者&#xff0c;你如何优化垃圾回收以提高应用性能&#xff1f; 答案…...

Java数组创建与使用

一.创建和初始化 1.数组是怎么创建的&#xff1f; 直接举例子&#xff1a; int[] arr new int[10]; 这里只简单的举一个int开辟数组的例子。 可见java数组的创建于C语言是不同的。前面是一个int[ ]就是一个数组的数据类型&#xff0c;后面的arr是数组名&#xff0c;最后[…...

EMAP如何建数据源

新建数据源&#xff1a; EMAP底座的数据源&#xff0c;名称为“DB_EMAP_BIZ_BASE"&#xff0c;不可随意更改. 新建业务系统数据源&#xff0c;名称为”DB_STUDY_BIZ_BASE". 支持的数据库&#xff1a; 支持两种类型数据库&#xff1a;H2 和 oracle 新建H2数据源&am…...

在 Linux 中创建文件

目录 ⛳️推荐 前言 使用 touch 命令创建一个新的空文件 使用 echo 命令创建一个新文件 使用 cat 命令创建新文件 测试你的知识 ⛳️推荐 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到…...

高德地图+HTML+点击事件+自定心信息窗体

代码如下 <!doctype html> <html><head><meta charset"utf-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"initial-scale1.0, user-scalableno, width…...