IO知识整理
IO
面向系统IO
page cache
程序虚拟内存到物理内存的转换依靠cpu中的mmu映射
物理内存以page(4k)为单位做分配
多个程序访问磁盘上同一个文件,步骤
- kernel将文件内容加载到pagecache
- 多个程序读取同一份文件指向的同一个pagecache
- 多个程序各自维护各自的fd,fd中seek记录偏移量,指向具体数据
pagecache特点
- 为内核维护的中间层
- 淘汰机制
- 持久化机制,是否会丢失数据
- 占用内存大小
Java输出流
- 不使用buffer的输出流
- 使用buffer的输出流
- 速度超过不使用buffer的输出流,是因为先在jvm内存中写入,默认到8kb写完调用一次systemcall
- 随机输出流
内存充足的情况下,会优先写入pagecache,只要未达到脏页持久化阈值,就不会写入磁盘,不论pagecache里面多少数据,关机重启后将全部丢失;内存不充足的情况下,pagecache中的新数据的写入会导致老数据的持久化,进而写入磁盘。
可以通过手动调用系统flush将脏页写入磁盘
脏页持久化之后cache依然存在于内存,只有内存不够分配时才会淘汰cache,且淘汰的cache一定不是脏页。
ByteBuffer
@Test
public void whatByteBuffer() {// 堆内分配内存// ByteBuffer buffer = ByteBuffer.allocate(1024);// 堆外分配内存ByteBuffer buffer = ByteBuffer.allocateDirect(1024);System.out.println("postition: " + buffer.position());System.out.println("limit: " + buffer.limit());System.out.println("capacity: " + buffer.capacity());// position起点 limit终点 capacity容量System.out.println("mark: " + buffer);// position->3 buffer.put("123".getBytes());System.out.println("-------------put:123......");System.out.println("mark: " + buffer);// 读写交替 position->0 limit->3buffer.flip();System.out.println("-------------flip......");System.out.println("mark: " + buffer);// 读取一个byte pos->1 limit->3buffer.get();System.out.println("-------------get......");System.out.println("mark: " + buffer);// 读写交替 pos->2 limit->1024 已读取的字节删除buffer.compact();System.out.println("-------------compact......");System.out.println("mark: " + buffer);buffer.clear();System.out.println("-------------clear......");System.out.println("mark: " + buffer);
}
mmap
堆外创建一个地址空间,只有文件系统可以直接调用
该内存空间的数据不需要通过系统调用及用户态和内核态的切换,直接写入数据就可以同步
但是依然收到系统page cache限制,存在数据丢失的可能
Direct IO
可以绕过系统控制的page cache,不受系统page cache参数控制
程序自己维护page cache,通过自定义代码逻辑维护一致性/dirty等…
好处是自己维护page cache刷磁盘的阈值,与系统通用配置隔离,但是依然存在丢失数据的逻辑
@Test
public void testRandomAccessFileWrite() throws Exception {RandomAccessFile raf = new RandomAccessFile(path, "rw");raf.write("hello mashibing\n".getBytes());raf.write("hello seanzhou\n".getBytes());System.out.println("write------------");System.in.read();// 指针调整,往前调整后继续写会直接覆盖原有数据raf.seek(4);raf.write("ooxx".getBytes());System.out.println("seek---------");System.in.read();FileChannel rafchannel = raf.getChannel();// mmap 堆外(jvm堆外且是linux的java进程堆外的内存) 和文件映射的 byte not object// 此时文件大小会变为4096kbMappedByteBuffer map = rafchannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096);map.put("@@@".getBytes()); //不是系统调用 但是数据会到达 内核的pagecache//曾经我们是需要out.write() 这样的系统调用,才能让程序的data 进入内核的pagecache//曾经必须有用户态内核态切换//mmap的内存映射,依然是内核的pagecache体系所约束的!!!//换言之,丢数据System.out.println("map--put--------");System.in.read();// 数据刷入磁盘 等于flush// map.force(); raf.seek(0);ByteBuffer buffer = ByteBuffer.allocate(8192);// ByteBuffer buffer = ByteBuffer.allocateDirect(1024);// 将channel中的数据读入buffer 等价于buffer.put()int read = rafchannel.read(buffer); System.out.println(buffer);// 此时翻转后可以开始读取数据 pos->0 limit->4096buffer.flip();System.out.println(buffer);for (int i = 0; i < buffer.limit(); i++) {Thread.sleep(200);System.out.print(((char)buffer.get(i)));}
}
面向网络IO
TCP
面向连接的,可靠的传输协议
- 三次握手
- 四次分手
- 内核级开辟资源,建立连接
- 即使服务端没有accept,也可以建立连接(established状态),只是不分配具体的pid
- 而从java进程内部,看到的连接依然是listen状态
- 此时可以从客户端发送数据,而服务端不能接收,服务端开启accept后可以接收到已发送的数据
三次握手报文
- win 滑动窗口长度 协商结果为115
- tcp拥塞控制,提速
- 服务端窗口已满
- 客户端阻塞不再继续发包(如果继续发包会丢弃后发的数据)
- tcp拥塞控制,提速
- seq 序列号 client->server server将序列号+1作为ack返回给client
- mtu ifconfig可以看到网口字节长度
- mss 实际数据字节长度,基本上为mtu-ip长度-port长度 各自20字节
四次分手
- 异常状态
- CLOSE_WAIT(接收方第三次FIN信号没有发送成功)
- TIME_WAIT (连接关闭后,为防止接收方没有收到最后一次ACK,保留连接对应资源)
- FIN_WAIT2(没有收到FIN信号)
Socket
四元组(cip + sport +sip + sport)
- 服务端不需要给客户端连接分配新的端口号(四元组标识唯一,客户端服务端各存一份)
内核级别
关键配置
-
backlog
-
配置后续队列,超过配置条数+1之后,会将连接状态置为SYNC_REC状态,不可连接
-
应当根据处理速度、cpu条数来进行配置,防止建立过多连接
-
-
nodelay
- false:使用优化,会积攒超过buffer长度的字节一次发到服务端,但是会有延时
- true:不使用优化,根据内核调度尽快发送,会多出几次网络io
-
oobinline
- 结合nodelay配置使用效果明显,会单独发出第一个字节
-
keepalive
- 开启后会保持心跳,判断相互之间连接是否生效
IO模型
模型分类
- 同步:程序自己进行R/W操作
- 异步:内核完成R/W 程序像是不访问IO一样,直接使用buffer,linux不可用
- 阻塞:BLOCKING BIO
- 非阻塞:NONBLOCKING NIO
linux
- 同步阻塞:BIO
- 同步非阻塞:NIO、多路复用
BIO
多线程BIO模型,主线程accept+创建子线程,子线程做recv
public static void main(String[] args) throws Exception {ServerSocket server = new ServerSocket(9090,20);System.out.println("step1: new ServerSocket(9090) ");while (true) {// linux指令 socket() ->fd3 bind(fd3,8090) accept(fd3)->fd5Socket client = server.accept(); //阻塞1System.out.println("step2:client\t" + client.getPort());// 主线程只负责创建子线程用来接收数据new Thread(new Runnable(){public void run() {InputStream in = null;try {in = client.getInputStream();BufferedReader reader = new BufferedReader(new InputStreamReader(in));while(true){// linux指令 recv(fd5) 子线程负责读取数据String dataline = reader.readLine(); //阻塞2if(null != dataline){System.out.println(dataline);}else{client.close();break;}}System.out.println("客户端断开");} catch (IOException e) {e.printStackTrace();}}}).start();}
BIO多线程模型需要创建新线程(系统调用clone)+线程调度,导致速率降低
BIO的弊端主要来自于阻塞,系统内核级别阻塞(accept+recv)
- accept等待连接和recv接收数据会阻塞,所以通过创建子线程的方式进行规避,但是clone指令+线程切换也是有很高成本的,当客户端连接数量增加时,处理速度会明显降低
- 因为recv接收数据会发生阻塞,所以当多个客户端连接的时候只能使用多个线程的方式来进行读取,如果只使用一个线程,当连接阻塞时,没办法读取其他连接发送的数据
NIO
accept指令不会阻塞,如果没有连接会直接返回-1,在Java api中会体现为对象为null
不阻塞的好处
- 可以不额外创建线程,在一个线程中执行建立连接和接收信息两个操作,节省了创建连接和线程切换的成本
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kpaWtaly-1676460489179)(IO.assets/image-20230209151431908.png)]
public static void main(String[] args) throws Exception {LinkedList<SocketChannel> clients = new LinkedList<>();ServerSocketChannel ss = ServerSocketChannel.open(); //服务端开启监听:接受客户端ss.bind(new InetSocketAddress(9090));ss.configureBlocking(false); //重点 OS NONBLOCKING!!! //只让接受客户端 不阻塞while (true) {//接受客户端的连接Thread.sleep(1000);SocketChannel client = ss.accept(); //不会阻塞? -1 NULL//accept 调用内核了:1,没有客户端连接进来在BIO 的时候一直卡着,但是在NIO不卡着,返回-1,NULL//如果来客户端的连接,accept 返回的是这个客户端的fd 5,client object//NONBLOCKING 就是代码能往下走了,只不过有不同的情况if (client == null) {System.out.println("null.....");} else {client.configureBlocking(false); //重点 连接socket非阻塞// socket// 服务端的listen socket(连接请求三次握手后,通过accept 得到 连接的socket)// 连接socket(连接后的数据读写使用的)int port = client.socket().getPort();System.out.println("client..port: " + port);clients.add(client);}ByteBuffer buffer = ByteBuffer.allocateDirect(4096); //可以在堆里 堆外//遍历已经连接进来的客户端能不能读写数据for (SocketChannel c : clients) { //串行化!!!! 多线程!!int num = c.read(buffer); // >0 -1 0 //不会阻塞if (num > 0) {buffer.flip();byte[] aaa = new byte[buffer.limit()];buffer.get(aaa);String b = new String(aaa);System.out.println(c.socket().getPort() + " : " + b);buffer.clear();}}}
}
NIO的弊端
-
每次获取数据是O(n)级别的时间复杂度
- 例如有10w连接,只有100个数据传输,nio需要10w次recv系统调用,这里面大部分的系统调用是无意义的(没有数据传输,徒增成本)
多路复用IO
程序通过一次系统调用获得其中IO状态,然后程序自己实现对于有状态的IO进行R/W,时间负责度O(m) + O(1)
无论select、poll还是nio,本质上都是对程序持有的fds依次遍历,只不过区别是nio的依次遍历发生在程序侧,需要发生n次系统调用,以及n次用户态内核态的切换;而select和poll,是需要程序传递fds给内核,内核触发遍历,只发生一次系统调用。
linux下 多路复用器
- SELECT POSIX规范
- 受到FD_SETSIZE的限制,一次最多1024个fd
- 每次都要重新传递fds,每次内核调用都需要触发全量调用传递的fds
- POLL
- 不受到FD_SETSIZE的限制
- 每次都要重新传递fds,每次内核调用都需要触发全量调用传递的fds
- EPOLL
- 内核开辟空间保存fd,规避程序重复传递fd的问题和遍历全量fd的问题
public class SocketMultiplexingSingleThreadv1 {private ServerSocketChannel server = null;//linux 多路复用器(select poll epoll kqueue) nginx event{}private Selector selector = null; int port = 9090;public void initServer() {try {server = ServerSocketChannel.open();server.configureBlocking(false);server.bind(new InetSocketAddress(port));//如果在epoll模型下,open--》 epoll_create -> fd3selector = Selector.open(); // select poll *epoll 优先选择:epoll 但是可以 -D修正//server 约等于 listen状态的 fd4/*register如果:select,poll:jvm里开辟一个数组 fd4 放进去epoll: epoll_ctl(fd3,ADD,fd4,EPOLLIN*/server.register(selector, SelectionKey.OP_ACCEPT);} catch (IOException e) {e.printStackTrace();}}public void start() {initServer();System.out.println("服务器启动了。。。。。");try {while (true) { //死循环Set<SelectionKey> keys = selector.keys();System.out.println(keys.size()+" size");//1,调用多路复用器(select,poll or epoll (epoll_wait))/*select()是啥意思:1,select,poll 其实 内核的select(fd4) poll(fd4)2,epoll: 其实 内核的 epoll_wait()*, 参数可以带时间:没有时间,0 : 阻塞,有时间设置一个超时selector.wakeup() 结果返回0懒加载:其实再触碰到selector.select()调用的时候触发了epoll_ctl的调用*/while (selector.select() > 0) {Set<SelectionKey> selectionKeys = selector.selectedKeys(); //返回的有状态的fd集合Iterator<SelectionKey> iter = selectionKeys.iterator();// NIO 对着每一个fd调用系统调用,浪费资源// 多路复用IO调用一次select方法,返回的那些fc可以R/Wwhile (iter.hasNext()) {SelectionKey key = iter.next();iter.remove(); //set 不移除会重复循环处理if (key.isAcceptable()) {//看代码的时候,这里是重点,如果要去接受一个新的连接//语义上,accept接受连接且返回新连接的FD对吧?//那新的FD怎么办?//select,poll,因为他们内核没有空间,那么在jvm中保存和前边的fd4那个listen的一起//epoll: 我们希望通过epoll_ctl把新的客户端fd注册到内核空间acceptHandler(key);} else if (key.isReadable()) {readHandler(key); //连read 还有 write都处理了//在当前线程,这个方法可能会阻塞 ,如果阻塞了十年,其他的IO早就没电了。。。//所以,为什么提出了 IO THREADS//redis 是不是用了epoll,redis是不是有个io threads的概念 ,redis是不是单线程的//tomcat 8,9 异步的处理方式 IO 和 处理上 解耦}}}}} catch (IOException e) {e.printStackTrace();}}public void acceptHandler(SelectionKey key) {try {ServerSocketChannel ssc = (ServerSocketChannel) key.channel();SocketChannel client = ssc.accept(); //来啦,目的是调用accept接受客户端 fd7client.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocate(8192);//你看,调用了register/*select,poll:jvm里开辟一个数组 fd7 放进去epoll: epoll_ctl(fd3,ADD,fd7,EPOLLIN*/client.register(selector, SelectionKey.OP_READ, buffer);System.out.println("-------------------------------------------");System.out.println("新客户端:" + client.getRemoteAddress());System.out.println("-------------------------------------------");} catch (IOException e) {e.printStackTrace();}}public void readHandler(SelectionKey key) {SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();buffer.clear();int read = 0;try {while (true) {read = client.read(buffer);if (read > 0) {buffer.flip();while (buffer.hasRemaining()) {client.write(buffer);}buffer.clear();} else if (read == 0) {break;} else {client.close();break;}}} catch (IOException e) {e.printStackTrace();}}public static void main(String[] args) {SocketMultiplexingSingleThreadv1 service = new SocketMultiplexingSingleThreadv1();service.start();}
}
不同多路复用器对应的系统指令
- POLL(jdk native 用户空间 保存了fd)
- 创建server并进行监听
- socket(PF_INET, SOCK_STREAM, IPPROTO_IP) = 4
- fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK) = 0 //server.configureBlocking(false);
- bind(4, {sa_family=AF_INET, sin_port=htons(9090)
- listen(4, 50)
- 多路复用建立连接
- poll([{fd=5, events=POLLIN}, {fd=4, events=POLLIN}], 2, -1) = 1 ([{fd=4, revents=POLLIN}])
- accept(4, = 7 //获取到新的客户端
- fcntl(7, F_SETFL, O_RDWR|O_NONBLOCK) //非阻塞
- poll([{fd=5, events=POLLIN}, {fd=4, events=POLLIN}, {fd=7, events=POLLIN}], 3, -1) = 1 //selector.select() 注册新的客户端并查询客户端fd是否有状态变更
- 创建server并进行监听
- EPOLL
- 创建server并进行监听
- socket(PF_INET, SOCK_STREAM, IPPROTO_IP) = 4
- fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK) = 0 //server.configureBlocking(false);
- bind(4, {sa_family=AF_INET, sin_port=htons(9090)
- listen(4, 50)
- 多路复用创建连接
- epoll_create(256) = 7 (epfd)
- epoll_ctl(7, EPOLL_CTL_ADD, 4,
- epoll_wait(7, {{EPOLLIN, {u32=4, u64=2216749036554158084}}}, 4096, -1) = 1 // selector.select()
- accept(4 =8 //建立连接获取新client的fd
- fcntl(8, F_SETFL, O_RDWR|O_NONBLOCK) //设置为非阻塞
- epoll_ctl(7, EPOLL_CTL_ADD, 8, {EPOLLIN, //注册新的客户端到多路复用器
- epoll_wait(7, //等待有状态变更的fd返回
- 创建server并进行监听
多线程Selector
多线程 多路复用IO
分一个bossGroup 和一个workerGroup;bossGroup负责listen,workGroup负责人R/W
public class MainThread {public static void main(String[] args) {//1,创建 IO Thread (一个或者多个)SelectorThreadGroup boss = new SelectorThreadGroup(3); //混杂模式//boss有自己的线程组SelectorThreadGroup worker = new SelectorThreadGroup(3); //混杂模式//worker有自己的线程组boss.setWorker(worker);//但是,boss得多持有worker的引用:/*** boss里选一个线程注册listen , 触发bind,从而,这个不选中的线程得持有 workerGroup的引用* 因为未来 listen 一旦accept得到client后得去worker中 next出一个线程分配*/boss.bind(9999);boss.bind(8888);boss.bind(6666);boss.bind(7777);}
}
public class SelectorThread extends ThreadLocal<LinkedBlockingQueue<Channel>> implements Runnable{// 每线程对应一个selector,// 多线程情况下,该主机,该程序的并发客户端被分配到多个selector上//注意,每个客户端,只绑定到其中一个selector//其实不会有交互问题Selector selector = null;LinkedBlockingQueue<Channel> lbq = get(); //lbq 在接口或者类中是固定使用方式逻辑写死了。你需要是lbq每个线程持有自己的独立对象SelectorThreadGroup stg;@Overrideprotected LinkedBlockingQueue<Channel> initialValue() {return new LinkedBlockingQueue<>();}SelectorThread(SelectorThreadGroup stg){try {this.stg = stg;selector = Selector.open();} catch (IOException e) {e.printStackTrace();}}@Overridepublic void run() {//Loopwhile (true){try {//1,select()int nums = selector.select(); //阻塞 wakeup()//2,处理selectkeysif(nums>0){Set<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> iter = keys.iterator();while(iter.hasNext()){ //线程处理的过程SelectionKey key = iter.next();iter.remove();if(key.isAcceptable()){ //复杂,接受客户端的过程(接收之后,要注册,多线程下,新的客户端,注册到那里呢?)acceptHandler(key);}else if(key.isReadable()){readHander(key);}else if(key.isWritable()){}}}//3,处理一些task : listen clientif(!lbq.isEmpty()){//只有方法的逻辑,本地变量是线程隔离的Channel c = lbq.take();if(c instanceof ServerSocketChannel){ServerSocketChannel server = (ServerSocketChannel) c;server.register(selector,SelectionKey.OP_ACCEPT);System.out.println(Thread.currentThread().getName()+" register listen");}else if(c instanceof SocketChannel){SocketChannel client = (SocketChannel) c;ByteBuffer buffer = ByteBuffer.allocateDirect(4096);client.register(selector, SelectionKey.OP_READ, buffer);System.out.println(Thread.currentThread().getName()+" register client: " + client.getRemoteAddress());}}} catch (Exception e) {e.printStackTrace();} }}private void readHander(SelectionKey key) {System.out.println(Thread.currentThread().getName()+" read......");ByteBuffer buffer = (ByteBuffer)key.attachment();SocketChannel client = (SocketChannel)key.channel();buffer.clear();while(true){try {int num = client.read(buffer);if(num > 0){buffer.flip(); //将读到的内容翻转,然后直接写出while(buffer.hasRemaining()){client.write(buffer);}buffer.clear();}else if(num == 0){break;}else {//客户端断开了System.out.println("client: " + client.getRemoteAddress()+"closed......");key.cancel();break;}} catch (IOException e) {e.printStackTrace();}}}private void acceptHandler(SelectionKey key) {System.out.println(Thread.currentThread().getName()+" acceptHandler......");ServerSocketChannel server = (ServerSocketChannel)key.channel();try {SocketChannel client = server.accept();client.configureBlocking(false);stg.nextSelectorV3(client);// stg.nextSelectorV2(client);} catch (IOException e) {e.printStackTrace();}}public void setWorker(SelectorThreadGroup stgWorker) {this.stg = stgWorker;}
}
public class SelectorThreadGroup { //天生都是bossSelectorThread[] sts;ServerSocketChannel server=null;AtomicInteger xid = new AtomicInteger(0);SelectorThreadGroup stg = this;public void setWorker(SelectorThreadGroup stg){this.stg = stg;}SelectorThreadGroup(int num){//num 线程数sts = new SelectorThread[num];for (int i = 0; i < num; i++) {sts[i] = new SelectorThread(this);new Thread(sts[i]).start();}}public void bind(int port) {try {server = ServerSocketChannel.open();server.configureBlocking(false);server.bind(new InetSocketAddress(port));//注册到那个selector上呢?nextSelectorV3(server);} catch (IOException e) {e.printStackTrace();}}public void nextSelectorV3(Channel c) {try {if(c instanceof ServerSocketChannel){SelectorThread st = next(); //listen 选择了 boss组中的一个线程后,要更新这个线程的work组st.lbq.put(c);st.setWorker(stg);st.selector.wakeup();}else {SelectorThread st = nextV3(); //在 main线程种,取到堆里的selectorThread对象//1,通过队列传递数据 消息st.lbq.add(c);//2,通过打断阻塞,让对应的线程去自己在打断后完成注册selectorst.selector.wakeup();}} catch (InterruptedException e) {e.printStackTrace();}}//无论 serversocket socket 都复用这个方法private SelectorThread next() {int index = xid.incrementAndGet() % sts.length; //轮询就会很尴尬,倾斜return sts[index];}private SelectorThread nextV3() {int index = xid.incrementAndGet() % stg.sts.length; //动用worker的线程分配return stg.sts[index];}
}
Netty
ButeBuf
类似于jdk原生ByteBuffer封装
//initialCapacity maxCapacity 默认分配为堆外内存
//ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(8, 20);
ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(8, 20);System.out.println("buf.isReadable() :" + buf.isReadable());
System.out.println("buf.readerIndex() :" + buf.readerIndex());
System.out.println("buf.readableBytes() " + buf.readableBytes());
System.out.println("buf.isWritable() :" + buf.isWritable());
System.out.println("buf.writerIndex() :" + buf.writerIndex());
System.out.println("buf.writableBytes() :" + buf.writableBytes());
System.out.println("buf.capacity() :" + buf.capacity());
System.out.println("buf.maxCapacity() :" + buf.maxCapacity());
//是否为堆外内存
System.out.println("buf.isDirect() :" + buf.isDirect());
Client端
NioEventLoopGroup
NioSocketChannel
客户端读写需要注册到类似于多路复用器
@Test
public void clientMode() throws Exception {NioEventLoopGroup thread = new NioEventLoopGroup(1);//客户端模式:NioSocketChannel client = new NioSocketChannel();thread.register(client); //epoll_ctl(5,ADD,3)//响应式:输入处理ChannelPipeline p = client.pipeline();p.addLast(new MyInHandler());//reactor 异步的特征ChannelFuture connect = client.connect(new InetSocketAddress("192.168.150.11", 9090));ChannelFuture sync = connect.sync();ByteBuf buf = Unpooled.copiedBuffer("hello server".getBytes());ChannelFuture send = client.writeAndFlush(buf);//同步处理 send.sync();//同步处理 等待服务端断开连接sync.channel().closeFuture().sync();System.out.println("client over....");
}class MyInHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("client registed...");}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("client active...");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;// 这里readCharSequence会移动ByteBuffer里面的指针,所以再写回去是没有数据的// CharSequence str = buf.readCharSequence(buf.readableBytes(), CharsetUtil.UTF_8);CharSequence str = buf.getCharSequence(0, buf.readableBytes(), CharsetUtil.UTF_8);System.out.println(str);ctx.writeAndFlush(buf);}
}
Server端
NioEventLoopGroup
NioServerSocketChannel
还是响应式编程,有客户端连接后通过acceptHandler进行accept和注册R/W Handler
@Test
public void serverMode() throws Exception {NioEventLoopGroup thread = new NioEventLoopGroup(1);NioServerSocketChannel server = new NioServerSocketChannel();thread.register(server);ChannelPipeline p = server.pipeline();//这里通过ChannelInit处理注册R/W处理器//accept接收客户端,并且注册到selectorp.addLast(new MyAcceptHandler(thread, new ChannelInit())); ChannelFuture bind = server.bind(new InetSocketAddress("192.168.150.1", 9090));bind.sync().channel().closeFuture().sync();System.out.println("server close....");
}class MyAcceptHandler extends ChannelInboundHandlerAdapter {private final EventLoopGroup selector;private final ChannelHandler handler;public MyAcceptHandler(EventLoopGroup thread, ChannelHandler myInitHandler) {this.selector = thread;this.handler = myInitHandler; //ChannelInit}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("server registerd...");}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// listen socket accept client// socket R/W ByteBufSocketChannel client = (SocketChannel) msg;//响应式的 handlerChannelPipeline p = client.pipeline();p.addLast(handler); //1,client::pipeline[ChannelInit,]//注册selector.register(client);}
}/**
* @ChannelHandler.Sharable表示线程间共享
* 通过ChannelInit初始化来进行与业务处理器之间的解耦
*/
@ChannelHandler.Sharable
class ChannelInit extends ChannelInboundHandlerAdapter {@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {Channel client = ctx.channel();ChannelPipeline p = client.pipeline();p.addLast(new MyInHandler());//2,client::pipeline[ChannelInit,MyInHandler]//用完清除自己即可ctx.pipeline().remove(this);//3,client::pipeline[MyInHandler]}
}
Nio Bootstrap
Client端
@Test
public void nettyClient() throws InterruptedException {NioEventLoopGroup group = new NioEventLoopGroup(1);Bootstrap bs = new Bootstrap();ChannelFuture connect = bs.group(group).channel(NioSocketChannel.class)// .handler(new ChannelInit()).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();p.addLast(new MyInHandler());}}).connect(new InetSocketAddress("192.168.150.11", 9090));Channel client = connect.sync().channel();ByteBuf buf = Unpooled.copiedBuffer("hello server".getBytes());ChannelFuture send = client.writeAndFlush(buf);send.sync();client.closeFuture().sync();
}
Server端
@Test
public void nettyServer() throws InterruptedException {NioEventLoopGroup group = new NioEventLoopGroup(1);ServerBootstrap bs = new ServerBootstrap();ChannelFuture bind = bs.group(group, group).channel(NioServerSocketChannel.class)// .childHandler(new ChannelInit()).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();p.addLast(new MyInHandler());}}).bind(new InetSocketAddress("192.168.150.1", 9090));bind.sync().channel().closeFuture().sync();}
C10K problem
单机连接10k客户端问题,随着单服务端连接客户端越来越多,才逐渐出现Nio、多路复用IO等模型
http://www.kegel.com/c10k.html
public static void main(String[] args) {LinkedList<SocketChannel> clients = new LinkedList<>();InetSocketAddress serverAddr = new InetSocketAddress("192.168.150.11", 9090);//端口号的问题:65535// windowsfor (int i = 10000; i < 65000; i++) {try {SocketChannel client1 = SocketChannel.open();SocketChannel client2 = SocketChannel.open();/*linux中你看到的连接就是:client...port: 10508client...port: 10508*/client1.bind(new InetSocketAddress("192.168.150.1", i));// 192.168.150.1:10000 192.168.150.11:9090client1.connect(serverAddr);clients.add(client1);client2.bind(new InetSocketAddress("192.168.110.100", i));// 192.168.110.100:10000 192.168.150.11:9090client2.connect(serverAddr);clients.add(client2);} catch (IOException e) {e.printStackTrace();}}System.out.println("clients "+ clients.size());try {System.in.read();} catch (IOException e) {e.printStackTrace();}
}
IO数据流向图
EPOLL即为在内核空间中通过链表保存有数据状态变更的fd,直接从链表中获取fd即可
相关文章:
IO知识整理
IO 面向系统IO page cache 程序虚拟内存到物理内存的转换依靠cpu中的mmu映射 物理内存以page(4k)为单位做分配 多个程序访问磁盘上同一个文件,步骤 kernel将文件内容加载到pagecache多个程序读取同一份文件指向的同一个pagecache多个程…...
【正点原子FPGA连载】第十三章QSPI Flash读写测试实验 摘自【正点原子】DFZU2EG_4EV MPSoC之嵌入式Vitis开发指南
1)实验平台:正点原子MPSoC开发板 2)平台购买地址:https://detail.tmall.com/item.htm?id692450874670 3)全套实验源码手册视频下载地址: http://www.openedv.com/thread-340252-1-1.html 第十三章QSPI Fl…...
深入理解mysql的内核查询成本计算
MySql系列整体栏目 内容链接地址【一】深入理解mysql索引本质https://blog.csdn.net/zhenghuishengq/article/details/121027025【二】深入理解mysql索引优化以及explain关键字https://blog.csdn.net/zhenghuishengq/article/details/124552080【三】深入理解mysql的索引分类&a…...
LeetCode 141. 环形链表
原题链接 难度:easy\color{Green}{easy}easy 题目描述 给你一个链表的头节点 headheadhead ,判断链表中是否有环。 如果链表中有某个节点,可以通过连续跟踪 nextnextnext 指针再次到达,则链表中存在环。 为了表示给定链表中的…...
git提交
文章目录关于数据库:桌面/vue-admin/vue_shop_api 的 git 输入 打开 phpStudy ->mySQL管理器 导入文件同时输入密码,和文件名 node app.js 错误区: $ git branch // git branch 查看分支 只有一个main分支不见master解决: gi…...
Java中常见的编码集问题
收录于热门专栏Java基础教程系列(进阶篇) 一、遇到一个问题 1、读取CSV文件 package com.guor.demo.charset;import java.io.BufferedReader; import java.io.FileReader; import java.util.ArrayList; import java.util.HashMap; import java.util.L…...
数据结构与算法(Java版) | 就让我们来看看几个实际编程中遇到的问题吧!
上一讲,我给大家简单介绍了一下数据结构,以及数据结构与算法之间的关系,照理来说,接下来我就应该要给大家详细介绍线性结构和非线性结构了,但是在此之前,我决定还是先带着大家看几个实际编程中遇到的问题&a…...
【C++算法】dfs深度优先搜索(上) ——【全面深度剖析+经典例题展示】
💃🏼 本人简介:男 👶🏼 年龄:18 📕 ps:七八天没更新了欸,这几天刚搞完元宇宙,上午一直练🚗,下午背四级单词和刷题来着,还在忙一些学弟…...
总结高频率Vue面试题
目录 什么是三次握手? 什么是四次挥手?(close触发) 什么是VUEX? 什么是同源----跨域? 什么是Promise? 什么是fexl布局? 数据类型 什么是深浅拷贝? 什么是懒加载&…...
IP协议详解
目录 前言: IP协议 提出问题 解决方案 地址管理 子网掩码 路由选择 小结: 前言: IP协议作为网络层知名协议。当数据经过传输层使用TCP或者UDP对数据进行封装,然后当数据到达网络层,基于TCP或UDP数据包继续进行…...
webpack5 基础配置
在开发中,我们会使用 vue、react、less、scss等语法进行开发项目,但是浏览器只能识别 js、css,或者说在js中使用了es6中的import 导入 这时候也需要打包工具去转换成浏览器可以识别的语句。 一、使用webpack 1.初始化package.json npm i…...
IDEA入门安装使用教程
一、背景 作为一个Java开发者,有非常多编辑工具供我们选择,比如Eclipse、IntelliJ IDEA、NetBeans、Visual Studio Code、Sublime Text等等,这些有免费也有收费的,但是就目前市场占比来说普遍使用Eclipse和IntelliJ IDEA这两款主…...
Lambda表达式使用及详解
一 Lambda表达式的简介 Lambda表达式(闭包):java8的新特性,lambda运行将函数作为一个方法的参数,也就是函数作为参数传递到方法中。使用lambda表达式可以让代码更加简洁。 Lambda表达式的使用场景:用以简…...
JAVA练习52-打家劫舍
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 目录 前言 一、题目-打家劫舍 1.题目描述 2.思路与代码 2.1 思路 2.2 代码 总结 前言 提示:这里可以添加本文要记录的大概内容: 2月16日练习内容 提…...
简单谈一谈幂等测试
1、什么是幂等测试 幂等是一个抽象的概念,在编程中一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同,即多次调用方法或者接口不会改变业务状态,可以保证重复调用的结果和单次调用的结果一致。幂等测试,则主…...
typescript复习笔记
数组类型-限定每一项的类型 //写法一 const arrNumber: number[] [1, 2, 3] const arrString: string[] [a, b, c] //写法二 const arrNumber2: Array<number> [1, 2, 3] const arrString2: Array<string> [a, b, c]联合类型 符号是 | //数组可以存放字符串或…...
webstorm开发electron,调试主进程方案
官网教程地址:https://www.electronjs.org/zh/docs/latest/tutorial/debugging-main-process 我只能说官网太看得起人了,整这么简易的教程…… 命令行开关 第一步还是要按要求在我们的package.json里加上端口监听:–inspect5858 我的命令…...
2W字正则表达式基础知识总结,这一篇就够了!!(含前端常用案例,建议收藏)
正则表达式 (Regular Expression,简称 RE 或 regexp ) 是一种文本模式,包括普通字符(例如,a 到 z 之间的字母)和特殊字符(称为"元字符")正则表达式使用单个字符串来描述、匹配一系列匹…...
自学web前端觉得好难,可能你遇到了这些困境
好多人跟我说上学的时候也学过前端,毕业了想从事web前端开发的工作,但自学起来好难,快要放弃了,所以我总结了一些大家遇到的困境,希望对你会有所帮助。 目录 1. 意志是否坚定 2. 没有找到合适自己的老师 3. 为了找…...
ASEMI中低压MOS管18N20参数,18N20封装,18N20尺寸
编辑-Z ASEMI中低压MOS管18N20参数: 型号:18N20 漏极-源极电压(VDS):200V 栅源电压(VGS):30V 漏极电流(ID):18A 功耗(PD&#x…...
[NetBackup]客户端安装后server无法连通client
client name处填写客户端主机名,server to use for backups and restores处填写server端名字,与hosts文件内保持一致;source client for restores处填写client主机名,与server端hosts文件中保持一致,与主机实际名称保持…...
黑马Java后端项目实战--在线聊天交友
【课程简介】 越来越多的系统都有消息推送的功能,如聊天室、邮件推送、系统消息推送等; 要实现消息推送就需要服务端在数据有变化时主动推送消息给客户端,本次课程将带大家使用websocket实现消息推送。 【主讲内容】 1.方法:如…...
【实战系列 2】Yapi接口管理平台Getshell-Linux后门权限维持与痕迹清除
文章目录 前言一、网站主页到Getshell二、SSH软链接后门三、Linux权限维持 --隐藏踪迹3.1 隐藏远程SSH登陆记录3.2、ssh软链接后门连接失败的原因以及解决办法3.3、隐藏踪迹-痕迹清楚3.3.1、隐藏历史操作命令3.3.2、隐藏文件/文件夹3.3.3、修改文件时间戳3.3.4、隐藏权限3.3.5、…...
设计模式之抽象工厂模式(C++)
作者:翟天保Steven 版权声明:著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处 一、抽象工厂模式是什么? 抽象工厂模式是一种创建型的软件设计模式,该模式相当于升级版的工厂模式。 如果…...
Kotlin新手教程一(Kotlin简介及环境搭建)
目录一、 什么是Kotlin?二、为什么要使用Kotlin?三、使用IntelliJ IDEA搭建Kotlin四、Kotlin使用命令行编译一、 什么是Kotlin? Kotlin 是一种在 Java 虚拟机上运行的静态类型编程语言,它也可以被编译成为 JavaScript 源代码&…...
【虚拟仿真】Unity3D打包WEBGL实现全屏切换
推荐阅读 CSDN主页GitHub开源地址Unity3D插件分享简书地址我的个人博客 大家好,我是佛系工程师☆恬静的小魔龙☆,不定时更新Unity开发技巧,觉得有用记得一键三连哦。 一、前言 今天实现Unity3D打包WEBGL后实现按钮点击全屏和退出 全屏的实现…...
java对象内存结构分析与大小计算
java对象内存结构Java对象保存在堆中时,由三部分组成:对象头(object header):包括了关于堆对象的布局、类型、GC状态、同步状态和标识哈希码的基本信息。所有java对象都有一个共同的对象头格实例数据(Insta…...
RabbitMQ学习(七):交换器
〇、前言在之前的内容中,我们创建了一个工作队列。我们假设的是工作队列背后,每个任务都恰好交付给一个消 费者(工作进程)。在今天的内容中,我们将做一些完全不同的事情——我们将消息传达给多个消费者。这种模式 称为 “发布/订阅”。为了说…...
cmd命令大全
文章目录变量输入输出逻辑命令符控制语句函数注释变量 在批处理中,变量全部是弱类型的,通常可以当做字符串处理 1.初始化定义 set varthis a var 2.获取变量值 %var% 3.链接 set varcat%var1%%var2% 4.截取 %var:~n,m% n是起点,m是长度&…...
Git的使用方法(保姆级)
一、安装git二、创建凭据 ①打开电脑的凭据管理器git:https://gitee.com是固定写法用户名、密码是你创建gitee的用户名、密码三、在gitee中创建一个仓库四、项目提交到仓库的方法①选择一个项目交由git管理按照步骤一中召唤小黑窗口输入 git init 就可以出现.git文件夹②右键选…...
辽宁平台网站建设平台/百度网盘资源搜索引擎
题目:原题链接(困难) 标签:SQL 解法时间复杂度空间复杂度执行用时Ans 1 (Python)450ms (5.06%)Ans 2 (Python)Ans 3 (Python) 解法一: SELECT T.product_id,P.product_name,T.report_year,T.total_amount FROM (SEL…...
怎么在网站标头做图标/艺术培训学校招生方案
Linux操作系统性能评测与测试指标浅析性能测试是对一个操作系统运行效率进行评价的关键环节。我们采用适当的性能测试工具集,在保证工具正确运行和基准软硬件测试环境一致的前提下,运行性能测试工具,对测试数据进行收集和处理分析,…...
抽奖机网站怎么做/2023年火爆的新闻
太让人郁闷了,没想到竟然有人会兴师动众的用DDos攻击个人电脑,SynFlood这东西很久以前有玩过,不过被当作攻击目标确实是第一次,感觉还真是不一样。不过结果有点惨了,不是开8000端口的程序崩溃,而是阻止攻击…...
成都网站建设服务密需湖南岚鸿案例/百度云搜索引擎官网入口
打开Excel->工具->宏->Viaual Basic编辑器在弹出来的窗口中对着VBAproject点右键->插入->模块下面会出现一个名为"模块1",点击在右边的空白栏中粘贴以下内容:Function getpychar(char)tmp 65536 Asc(char)If (tmp > 45217…...
企业网站用视频做首页/今日竞彩足球最新比赛结果查询
1. 概述 本章主要讲解.net4.5如何实现多线程和异步处理的相关内容。 2. 主要内容 2.1 理解线程 ① 使用Thread类 public static class Program { public static void ThreadMethod() { for (int i 0; i < 10; i) { Console.WriteLine(“ThreadProc: {0}”, i); Thread.Slee…...
唐山高端网站建设/国内重大新闻10条
author:skate time:2013/03/01 mysql在线无性能影响删除7G大表 如何在mysql数据库里删除7G(或更大)大表,使其又不影响服务器的io,导致性能下降影响业务。先不说其是mysql表,就是普通文件,如果直接rm删除&a…...