网络编程总结

前言

从一个例子出发:

假设客户端希望从服务器下载一个名为 file.txt 的文件,整个过程大致如下:

  1. 客户端发起 HTTP 请求
    客户端通过向服务器发送一个 HTTP 请求来获取文件。请求的格式如下:GET /file.txt HTTP/1.1 Host: www.oy.com
    客户端将通过 URL 解析和 DNS 查询,获取到目标服务器的 IP 地址。

  2. 建立 TCP 连接(TCP 三次握手)
    在发送 HTTP 请求之前,客户端和服务器需要通过 TCP 协议建立一个可靠的连接。三次握手的过程如下:

  • 客户端发送 SYN 请求,向服务器提出连接请求。
  • 服务器回复 SYN-ACK,确认连接请求。
  • 客户端再次发送 ACK,表示连接建立完成。
  1. 客户端发送 HTTP 请求
    建立 TCP 连接后,客户端通过该连接发送 HTTP 请求,请求服务器提供 file.txt 文件。

  2. 服务器处理请求并发送响应
    服务器接收到请求后,从磁盘读取文件内容(例如,file.txt),并通过 TCP 连接将文件内容作为 HTTP 响应回传给客户端。为了提高性能,服务器可能会使用零拷贝技术,以避免重复的内存拷贝操作。

  3. 客户端接收响应并关闭连接
    客户端通过 TCP 协议接收响应,并将文件保存到本地。当文件接收完毕后,客户端可以发送一个 FIN 请求,开始断开连接。服务器确认后,完成四次挥手,安全地断开连接。

在上面的第四步服务器处理请求并发送响应时,系统往往要进行IO处理,常见的IO模型如下:

I/O模型

核心概念:

阻塞与非阻塞

个人理解:阻塞和非阻塞一般讨论的是面对请求时的一种状态

阻塞(Blocking)

阻塞 I/O 模型是指在 I/O 操作执行期间,程序会被阻塞,直到该操作完成。在此期间,进程无法执行其他任务。

例如:

服务器通过read() 系统调用来读取硬盘中的数据,当我们请求的资源不可用时(资源被占用,没有数据到达等等),会使得进程休眠,从现象看就是卡在那里。

非阻塞(Non-blocking)

  • 非阻塞 I/O 模型是指在 I/O 操作过程中,程序不会被阻塞。如果没有数据可以读取或写入,调用会立即返回,而不是等待 I/O 操作完成。
  • 使用非阻塞 I/O 时,程序可以进行轮询或采取其他方法来检查是否可以继续处理 I/O 操作。

例如:

服务器程序可以通过设置O_NONBLOCK 标志将文件描述符设置为非阻塞模式,在这种模式下,如果 read() 无法立即完成,它们会立即返回而不会阻塞进程。

同步与异步

对于同步和异步,往往需要讨论的是多任务的场景,还得先引入并发并行的概念。

并发:多个任务在同一个时间段内同时执行,如果是单核心计算机,CPU 会不断地切换任务来完成并发操作。

并行:多任务在同一个时刻同时执行,计算机需要有多核心,每个核心独立执行一个任务,多个任务同时执行,不需要切换。

同步:多任务开始执行,任务 A、B、C 全部执行完成后才算是结束。

异步:多任务开始执行,只需要主任务 A 执行完成就算结束,主任务执行的时候,可以同时执行异步任务 B、C,主任务 A 可以不需要等待异步任务 B、C 的结果。

一般系统中的同步,可以通过串行来实现,也就是一个任务一个任务处理,而异步可以通过并发或者并行。

五种模型

在网络IO的处理中,服务器还需要经过以下过程:

2.drawio

1. 阻塞IO模式

从上面的解释就可以知道,当服务器进程read读取消息内容时,如果该进程请求的资源不可用时(资源被占用,没有数据到达等等),会使得进程休眠,从现象看就是卡在那里。

2. 非阻塞IO模式

如果服务器进程将read设置为非阻塞,在读取消息数据时,会立刻返回读取的结果,不会等待。

3. 多路复用IO模式

讨论并发的情况:若我们服务器进程想要处理多个客户端发送请求,此时需要靠多线程来帮助实现,同时,我们的线程无法知道什么时候会有数据到达,因此我们多个的线程只能不断的去进询问,查看缓冲区是否有数据可读。这带来了两个问题:

  • 如果请求数量过多,消耗的线程数量巨大
  • 多个线程需要一直查询,无数据可读时,其余时间白白浪费了资源

因此,我们希望有个方法,只需要少量的资源,例如一个进程或者一个线程就能监控这庞大的请求。linux中提供了三个系统函数:selectpollepoll。后续在详细讨论三个函数内容。简单来说,有了这些方法,我们的服务器进程可以调用一个它们就能监控多个客户端的请求,只要有资源可读时,函数就能返回可读的状态,这样避免了资源的浪费。

4. 信号驱动IO

在上面的多路复用IO模型中,都需要通过轮询的方法去进行检查可读状态,例如select,poll轮询所有文件描述符,而epoll是事件驱动,只监测变化的事件。它们的本质还是得需要一个线程去持续的监督。

而信号驱动 I/O 是通过 信号 通知进程或线程某个 I/O 操作已经完成。当 I/O 完成时,操作系统会向应用程序发送一个信号(如 SIGIO),通知它进行相应的处理。

信号驱动 I/O 通过异步的方式来通知进程,进程在收到信号时才会进行 I/O 操作。

5. 异步IO

以上讨论的四种办法中,都是进程在得知可读状态之后再去读取数据,那么为什么不能直接让系统去处理这个接受和读取数据的过程呢。

在前言部分已经解释了这个异步的概念。我们的进程不再负责数据的询问和读取,将这部分内容交给了内核去处理,主进程通知内核去读取数据,然后就去执行其他任务,当内核读取数据结束之后,通知主进程,主进程再处理这些数据,或者让线程处理。

一些关键区别:

信号驱动IO异步IO的区别:数据读取方是谁。

在得知数据可读的这个过程,两者其实都是异步的,即都不需要主进程参与,但在得知数据可读之后,信号驱动IO还是需要主进程去读取这个数据,而异步IO模型是由系统读取完了直接通知主进程。

关键系统函数

零拷贝

当我们服务器程序需要发送数据时,其过程如下:
1.drawio

一个文件的发送需要以下过程:

  • 调用读取函数,切换内核态,将数据从磁盘拷贝至缓冲区

  • 切换到用户态,将数据从缓冲区读取至进程缓冲区中。

  • 将数据从进程缓冲区拷贝至socket缓冲区。

  • 最后再拷贝至网卡中,再由网卡进行数据发送

可以看出浪费了许多时间。

在linux中提供了两个方法:

mmap

  • 应用进程调用了mmap()后,系统会把磁盘的数据拷贝到内核的缓冲区里。接着,应用进程跟操作系
    统内核「共享」这个缓冲区;
  • 应用进程再调用write(),操作系统直接将内核缓冲区的数据拷贝到socket缓冲区中,这一切都发生在内核态,由 CPU来搬运数据;
  • 最后,系统把内核的 socket 缓冲区里的数据,拷贝到网卡的缓冲区里。

sendfile

函数形式如下:

1
2
#include <sys/socket.h>
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);

它的前两个参数分别是目的端和源端的文件描述符,后面两个参数是源端的偏移量和复制数据的长度,返回值是实际复制数据的长度。

  • 应用进程调用了sendfile()之后,系统会把磁盘的数据拷贝到内核的缓冲区里。然后操作系统直接将内核缓冲区的数据拷贝到socket缓冲区中,这一切都发生在内核态,由CPU来搬运数据,不在需要write函数
  • 最后,系统把内核的 socket 缓冲区里的数据,拷贝到网卡的缓冲区里。

复用IO函数

1. select

概述

select 是最早的 I/O 多路复用机制之一,最早在 BSD Unix 中引入,并广泛应用于各类操作系统(包括 Linux)。它允许程序在多个文件描述符上等待 I/O 事件的发生,并在事件发生时通知程序。

工作原理
  • 程序调用 select 函数并传递三个集合(readfdswritefdsexceptfds),分别表示要监控的可读、可写和异常状态的文件描述符。
  • select 阻塞直到至少一个文件描述符的状态发生变化(如可读、可写或异常)。
  • select 返回时,它会告诉程序哪些文件描述符可以进行相应的操作。
优缺点
  • 优点
    • 简单易用,广泛支持,几乎在所有操作系统上都可用。
  • 缺点
    • 性能问题:当监控的文件描述符数目很大时,select 会非常低效。每次调用时,程序需要将所有文件描述符从用户空间复制到内核空间,并且内核需要遍历所有的文件描述符进行检查。这样,文件描述符数目越多,性能就越差。
    • 文件描述符数量限制select 的文件描述符数量有上限(通常是 FD_SETSIZE,在 Linux 上通常是 1024)。这意味着,如果要监控更多的连接,需要对代码进行修改或使用其他方式。
示例
1
2
3
4
5
6
7
8
9
10
11
12
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(socket_fd, &readfds);

struct timeval timeout;
timeout.tv_sec = 5;
timeout.tv_usec = 0;

int ret = select(socket_fd + 1, &readfds, NULL, NULL, &timeout);
if (ret > 0) {
// 处理 I/O
}

2. poll

概述

poll 是对 select 的改进,它解决了 select 文件描述符数量有限的缺点,但依然存在一些性能问题。poll 在功能上类似于 select,但它使用一个结构数组来表示要监控的文件描述符。

工作原理
  • 程序调用 poll 并传入一个 pollfd 结构数组,每个结构体包含文件描述符及其事件类型(POLLINPOLLOUTPOLLERR 等)。
  • poll 阻塞直到一个或多个文件描述符的状态发生变化。
  • poll 不会像 select 那样限制文件描述符数量(受限于系统配置),因此适用于更大的连接数。
优缺点
  • 优点
    • 没有文件描述符数量限制,可以监控更多的文件描述符。
    • 适用于中等规模的连接池。
  • 缺点
    • 性能问题:虽然没有文件描述符数量限制,但 poll 仍然需要在每次调用时遍历所有的文件描述符,因此它的性能在连接数非常大的时候仍然比较低效。
    • 内存消耗较高,因为需要为每个文件描述符分配一个 pollfd 结构。
示例
1
2
3
4
5
6
7
8
struct pollfd fds[1];
fds[0].fd = socket_fd;
fds[0].events = POLLIN;

int ret = poll(fds, 1, 5000); // 阻塞 5 秒
if (ret > 0) {
// 处理 I/O
}

3. epoll

概述

epoll 是 Linux 特有的 I/O 多路复用机制,它是 selectpoll 的进一步优化,设计用于解决这两者的性能瓶颈。epoll 采用事件驱动方式,它不需要每次调用时遍历所有文件描述符,而是通过内核事件通知机制来高效处理大量并发连接。

工作原理
  • 程序首先使用 epoll_create 创建一个 epoll 实例,该实例与一组文件描述符关联。
  • 程序通过 epoll_ctl 注册感兴趣的文件描述符及其事件(如可读、可写、异常等)。
  • 程序通过 epoll_wait 等待文件描述符的事件发生,一旦某个文件描述符有 I/O 操作准备好,epoll_wait 就会返回相应的事件。
  • 优势epoll 在事件触发时将只返回有事件的文件描述符,而不是遍历所有文件描述符,极大地提升了性能。
优缺点
  • 优点
    • 性能优越epoll 通过事件通知机制避免了每次调用时遍历所有文件描述符的开销,性能上比 selectpoll 优越,尤其在大量并发连接时。
    • 无需重新扫描epoll 支持边沿触发(edge-triggered)和水平触发(level-triggered),且每次只会返回有事件的文件描述符。
    • 无文件描述符数量限制epoll 不受文件描述符数量的限制,可以处理大规模的并发连接。
  • 缺点
    • 只在 Linux 上可用,不适用于其他操作系统。
    • 程序的设计可能稍微复杂,因为要处理事件的注册、等待和删除等操作。
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int epfd = epoll_create(1); // 创建 epoll 实例
struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = socket_fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, socket_fd, &event); // 注册文件描述符

struct epoll_event events[10];
int ret = epoll_wait(epfd, events, 10, -1); // 阻塞直到事件发生
if (ret > 0) {
for (int i = 0; i < ret; ++i) {
if (events[i].events & EPOLLIN) {
// 处理 I/O
}
}
}

区别

特性 select poll epoll
核心数据结构 fd_set(位掩码) pollfd 结构体数组 epoll_event 结构体数组
文件描述符管理 位掩码,固定大小(1024) 动态数组,无固定大小限制 红黑树 + 双向链表,高效管理
事件存储 每次调用需要重新设置 每次调用需要重新设置 内核维护,事件驱动
内存开销 固定大小,较大 动态分配,较大 动态分配,较小
效率 低(遍历所有 fd) 低(遍历所有 fd) 高(只处理活跃 fd)
时间复杂度 O(n)(遍历所有 fd) O(n)(遍历所有 fd) O(log n)(添加/删除)O(k)(事件等待,k 为活跃 fd)
文件描述符数量限制 有限制(通常为 1024) 无硬性限制 无硬性限制
触发模式 水平触发(LT) 水平触发(LT) 支持水平触发(LT)和边缘触发(ET)
跨平台支持 广泛支持 广泛支持 仅限 Linux
实现复杂度 简单 简单 复杂

总结

  1. select

    • 优点:实现简单,适合文件描述符数量较少的应用场景。
    • 缺点:文件描述符数量有限,性能随着文件描述符的增多急剧下降。每次调用时都需要遍历所有文件描述符,且需要在内核空间和用户空间之间复制文件描述符集合,消耗较大。
  2. poll

    • 优点:没有文件描述符数量的限制,相比 select,更加灵活,能够支持更多文件描述符。
    • 缺点:虽然没有 select 的限制,但仍然需要遍历所有文件描述符,性能问题依旧存在,尤其在大量并发连接时。
  3. epoll

    • 优点:性能极高,特别是在处理大量并发连接时。使用事件驱动机制,仅通知发生事件的文件描述符,避免了轮询和频繁的内存复制。适合高并发、大规模连接的应用。
    • 缺点:实现较为复杂,且只在 Linux 上可用。程序设计相对复杂,需要处理事件注册、删除和触发等操作。

一个http服务器的简易实现

采用半同步半反应堆形式实现。

缺点:

  • 主线程和工作线程共享请求队列,主线程往请求队列中添加任务,工作线程从请求队列中取出任务是互斥的操作,需要对请求队列进行加锁保护,导致白白浪费CPU时间;
  • 每个工作线程在同一时间内只能处理一个客户请求,如果客户较多而工作线程较少,就会导致任务队列中堆积大量的任务对象,此时客户端的响应将越来越慢。而如果通过增加工作线程来解决这个问题,就会因为大量的上下文切换导致消耗大量的CPU时间

主函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<assert.h>
#include<stdio.h>
#include<unistd.h>
#include<errno.h>
#include<string.h>
#include<fcntl.h>
#include<stdlib.h>
#include<cassert>
#include<sys/epoll.h>

#include "14_2_locker.h"
#include "15_3_threadpool.h"
#include "15_4_http_conn.h"

#define MAX_FD 65536
#define MAX_EVENT_NUMBER 10000

extern int addfd(int epollfd,int fd,bool one_shot);
extern int removefd(int epollfd,int fd);

void addsig(int sig,void(handler)(int),bool restart = true){
struct sigaction sa;
memset(&sa,'\0',sizeof(sa));
sa.sa_handler = handler;
if(restart){
sa.sa_flags |= SA_RESTART;
}
sigfillset(&sa.sa_mask);
assert(sigaction(sig,&sa,NULL) != -1);
}

void show_error(int connfd,const char *info){
printf("%s",info);
send(connfd,info,strlen(info),0);
close(connfd);
}

int main(int argc,char *argv[]){
if(argc <= 2){
printf("usage: %s ip_address port_number\n",basename(argv[0]));
return 1;
}
const char *ip = argv[1];
int port = atoi(argv[2]);

addsig(SIGPIPE,SIG_IGN);//忽略SIGPIPE信号

threadpool<http_conn> *pool = NULL;
try{
pool = new threadpool<http_conn>;
}
catch(...){//捕获所有异常
return 1;
}

http_conn *users = new http_conn[MAX_FD];
assert(users);
int listenfd = socket(PF_INET,SOCK_STREAM,0);
assert(listenfd >= 0);

struct linger tmp = {1,0};//避免time_wait状态
//设置端口复用
// SO_LINGER 选项的设置有助于在快速关闭连接时让套接字能够尽快地释放资源,使得端口可以更快速地复用。
setsockopt(listenfd,SOL_SOCKET,SO_LINGER,&tmp,sizeof(tmp));

int ret = 0;
struct sockaddr_in address;
bzero(&address,sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET,ip,&address.sin_addr);
address.sin_port = htons(port);

ret = bind(listenfd,(struct sockaddr *)&address,sizeof(address));
assert(ret >= 0);

ret = listen(listenfd,5);
assert(ret >= 0);

epoll_event events[MAX_EVENT_NUMBER];
int epollfd = epoll_create(5);
assert(epollfd != -1);
addfd(epollfd,listenfd,false);
http_conn::m_epollfd = epollfd;

while (true){
int number = epoll_wait(epollfd,events,MAX_EVENT_NUMBER,-1);
if((number < 0) && (errno != EINTR)){
printf("epoll failure\n");
break;
}

for(int i=0; i<number;++i){
int sockfd = events[i].data.fd;
//处理新到的客户连接
if(sockfd == listenfd){
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd,(struct sockaddr *)&client_address,&client_addrlength);
if(connfd < 0){
printf("errno is: %d\n",errno);
continue;
}
if(http_conn::m_user_count >= MAX_FD){
show_error(connfd,"Internal server busy");
continue;
}
users[connfd].init(connfd,client_address);
}
else if(events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)){
users[sockfd].close_conn();
}
else if(events[i].events & EPOLLIN){
//根据读的结果,决定是将任务添加到线程池,还是关闭连接
if(users[sockfd].read()){
pool->append(users + sockfd);
}
else{
users[sockfd].close_conn();
}
}
else if(events[i].events & EPOLLOUT){
if(!users[sockfd].write()){
users[sockfd].close_conn();
}
}
else{}
}
}
close(epollfd);
close(listenfd);
delete [] users;
delete pool;
return 0;
}

http消息的读取与分析

http_conn.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#ifndef HTTPCONNECTION_H
#define HTTPCONNECTION_H

#include<unistd.h> // read write close
#include<signal.h> // 信号处理
#include<sys/types.h> // socket bind listen accept
#include<sys/epoll.h>
#include<fcntl.h>
#include<sys/socket.h>
#include<netinet/in.h>// sockaddr_in
#include<arpa/inet.h> /// inet_pton
#include<assert.h>
#include<sys/stat.h> // stat
#include<string.h>
#include<pthread.h>
#include<stdio.h>
#include<stdlib.h>
#include<sys/mman.h>// mmap
#include<stdarg.h>// va_list va_start va_end
#include<errno.h>
#include "14_2_locker.h"

class http_conn{
public:
static const int FILENAME_LEN = 200; // 文件名的最大长度
static const int READ_BUFFER_SIZE = 2048; // 读缓冲区的大小
static const int WRITE_BUFFER_SIZE = 1024; // 写缓冲区的大小
enum METHOD{GET = 0,POST,HEAD,PUT,DELETE,TRACE,OPTIONS,CONNECT,PATCH}; // HTTP请求方法
enum CHECK_STATE{CHECK_STATE_REQUESTLINE = 0,CHECK_STATE_HEADER,CHECK_STATE_CONTENT}; // 解析客户请求时的主状态机状态
enum HTTP_CODE{NO_REQUEST,GET_REQUEST,BAD_REQUEST,NO_RESOURCE,FORBIDDEN_REQUEST,FILE_REQUEST,INTERNAL_ERROR,CLOSED_CONNECTION}; // 服务器处理HTTP请求的可能结果
enum LINE_STATUS{LINE_OK = 0,LINE_BAD,LINE_OPEN}; // 行的读取状态
public:
http_conn(){}
~http_conn(){}
public:
void init(int sockfd,const sockaddr_in &addr);// 初始化新接受的连接
void close_conn(bool real_close = true);// 关闭连接
void process();// 处理客户请求
bool read();// 非阻塞读操作
bool write();// 非阻塞写操作

private:
void init();
HTTP_CODE process_read();// 解析HTTP请求
bool process_write(HTTP_CODE ret);// 填充HTTP应答
// 下面这一组函数被process_read调用以分析HTTP请求
HTTP_CODE parse_request_line(char *text);// 解析请求行
HTTP_CODE parse_headers(char *text);// 解析请求头
HTTP_CODE parse_content(char *text);// 解析消息体
HTTP_CODE do_request();// 处理请求
char *get_line(){ return m_read_buf + m_start_line; }// 获取一行
LINE_STATUS parse_line();// 分析一行

// 下面这一组函数被process_write调用以填充HTTP应答
void unmap();// 释放资源
bool add_response(const char *format,...);// 向m_write_buf中写入响应
bool add_content(const char *content);// 向m_write_buf中写入内容
bool add_status_line(int status,const char *title);// 向m_write_buf中写入状态行
bool add_headers(int content_length);// 向m_write_buf中写入响应头
bool add_content_length(int content_length);// 向m_write_buf中写入Content-Length
bool add_linger();// 向m_write_buf中写入Connection
bool add_blank_line();// 向m_write_buf中写入空行
public:
// 所有socket上的事件都被注册到同一个epoll内核事件表中,所以epoll文件描述符设置为静态的
static int m_epollfd;
static int m_user_count;// 统计用户数量

private:
int m_sockfd;// 该HTTP连接的socket和对方的socket地址
sockaddr_in m_address;// 对方的socket地址
char m_read_buf[READ_BUFFER_SIZE];// 读缓冲区
int m_read_idx;// 标识读缓冲区中已经读入的客户数据的最后一个字节的下一个位置
int m_checked_idx;// 当前正在分析的字符在读缓冲区中的位置
int m_start_line;// 当前正在解析的行的起始位置
char m_write_buf[WRITE_BUFFER_SIZE];// 写缓冲区
int m_write_idx;// 写缓冲区中待发送的字节数
CHECK_STATE m_check_state;// 主状态机当前所处的状态
METHOD m_method;// 请求方法
char m_real_file[FILENAME_LEN];// 客户请求的目标文件的完整路径,其内容等于doc_root+m_url,doc_root是网站根目录
char *m_url;// 客户请求的目标文件的文件名
char *m_version;// HTTP协议版本号,我们仅支持HTTP/1.1
char *m_host;// 主机名
int m_content_length;// HTTP请求的消息体的长度
bool m_linger;// HTTP请求是否要求保持连接
char *m_file_address;// 客户请求的目标文件被mmap到内存中的起始位置
struct stat m_file_stat;// 目标文件的状态。通过它我们可以判断文件是否存在、是否为目录、是否可读,并获取文件大小等信息
struct iovec m_iv[2];// 采用writev来执行写操作,定义两个成员,其中一个指向m_write_buf,另一个指向m_file_address
int m_iv_count;// 表示被写内存块的数量
};


#endif

http_conn.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
#include "15_4_http_conn.h"

const char *ok_200_title = "OK";
const char *error_400_title = "Bad Request";
const char *error_400_form = "Your request has bad syntax or is inherently impossible to satisfy.\n";
const char *error_403_title = "Forbidden";
const char *error_403_form = "You do not have permission to get file from this server.\n";
const char *error_404_title = "Not Found";
const char *error_404_form = "The requested file was not found on this server.\n";
const char *error_500_title = "Internal Error";
const char *error_500_form = "There was an unusual problem serving the requested file.\n";

const char *doc_root = "/var/www/html";

int setnonblocking(int fd){
int old_option = fcntl(fd,F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd,F_SETFL,new_option);
return old_option;
}

void addfd(int epollfd,int fd,bool one_shot){
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
if(one_shot){
event.events |= EPOLLONESHOT;
}
epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event);
setnonblocking(fd);
}

void removefd(int epollfd,int fd){
epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,0);
close(fd);
}

void modfd(int epollfd,int fd,int ev){
epoll_event event;
event.data.fd = fd;
event.events = ev | EPOLLET | EPOLLONESHOT | EPOLLRDHUP;
epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&event);
}

int http_conn::m_epollfd = -1;
int http_conn::m_user_count = 0;

void http_conn::close_conn(bool real_close){
if(real_close && (m_sockfd != -1)){
removefd(m_epollfd,m_sockfd);
m_sockfd = -1;
m_user_count--;
}
}

void http_conn::init(int sockfd,const sockaddr_in &addr){
m_sockfd = sockfd;
m_address = addr;
addfd(m_epollfd,sockfd,true);
m_user_count++;
init();
}

void http_conn::init(){
m_check_state = CHECK_STATE_REQUESTLINE;
m_linger = false;
m_method = GET;
m_url = 0;
m_version = 0;
m_content_length = 0;
m_host = 0;
m_start_line = 0;
m_checked_idx = 0;
m_read_idx = 0;
m_write_idx = 0;
memset(m_read_buf,'\0',READ_BUFFER_SIZE);
memset(m_write_buf,'\0',WRITE_BUFFER_SIZE);
memset(m_real_file,'\0',FILENAME_LEN);
}
// 从状态机,用于解析出一行内容
http_conn::LINE_STATUS http_conn::parse_line(){
char temp;
for(;m_checked_idx < m_read_idx;++m_checked_idx){
temp = m_read_buf[m_checked_idx];
if(temp == '\r'){
if((m_checked_idx + 1) == m_read_idx){
return LINE_OPEN;
}
else if(m_read_buf[m_checked_idx + 1] == '\n'){
m_read_buf[m_checked_idx++] = '\0';
m_read_buf[m_checked_idx++] = '\0';
return LINE_OK;
}
return LINE_BAD;
}
else if(temp == '\n'){
if((m_checked_idx > 1) && (m_read_buf[m_checked_idx - 1] == '\r')){
m_read_buf[m_checked_idx - 1] = '\0';
m_read_buf[m_checked_idx++] = '\0';
return LINE_OK;
}
return LINE_BAD;
}
}
return LINE_OPEN;
}

// 循环读取客户数据,直到无数据可读或对方关闭连接
bool http_conn::read(){
if(m_read_idx >= READ_BUFFER_SIZE){
return false;
}
int bytes_read = 0;
while(true){
bytes_read = recv(m_sockfd,m_read_buf + m_read_idx,READ_BUFFER_SIZE - m_read_idx,0);
if(bytes_read == -1){
if(errno == EAGAIN || errno == EWOULDBLOCK){
break;
}
return false;
}
else if(bytes_read == 0){
return false;
}
m_read_idx += bytes_read;
}
return true;
}

// 解析HTTP请求
http_conn::HTTP_CODE http_conn::process_read(){
LINE_STATUS line_status = LINE_OK;
HTTP_CODE ret = NO_REQUEST;
char *text = 0;
while(((m_check_state == CHECK_STATE_CONTENT) && (line_status == LINE_OK)) || ((line_status = parse_line()) == LINE_OK)){
text = get_line();
m_start_line = m_checked_idx;
printf("got 1 http line: %s\n",text);
switch(m_check_state){
case CHECK_STATE_REQUESTLINE:{
ret = parse_request_line(text);
if(ret == BAD_REQUEST){
return BAD_REQUEST;
}
break;
}
case CHECK_STATE_HEADER:{
ret = parse_headers(text);
if(ret == BAD_REQUEST){
return BAD_REQUEST;
}
else if(ret == GET_REQUEST){
return do_request();
}
break;
}
case CHECK_STATE_CONTENT:{
ret = parse_content(text);
if(ret == GET_REQUEST){
return do_request();
}
line_status = LINE_OPEN;
break;
}
default:{
return INTERNAL_ERROR;
}
}
}
return NO_REQUEST;
}


// 解析请求行
http_conn::HTTP_CODE http_conn::parse_request_line(char *text){
m_url = strpbrk(text," \t");
if(!m_url){
return BAD_REQUEST;
}
*m_url++ = '\0';
char *method = text;
if(strcasecmp(method,"GET") == 0){
m_method = GET;
}
else{
return BAD_REQUEST;
}
m_url += strspn(m_url," \t");
m_version = strpbrk(m_url," \t");
if(!m_version){
return BAD_REQUEST;
}
*m_version++ = '\0';
m_version += strspn(m_version," \t");
if(strcasecmp(m_version,"HTTP/1.1") != 0){
return BAD_REQUEST;
}
if(strncasecmp(m_url,"http://",7) == 0){
m_url += 7;
m_url = strchr(m_url,'/');
}
if(!m_url || m_url[0] != '/'){
return BAD_REQUEST;
}
m_check_state = CHECK_STATE_HEADER;
return NO_REQUEST;
}

http_conn::HTTP_CODE http_conn::parse_content(char *text){
if(m_read_idx <= (m_content_length + m_checked_idx)){
text[m_read_idx - m_checked_idx] = '\0';
return GET_REQUEST;
}
return NO_REQUEST;
}

http_conn::HTTP_CODE http_conn::process_read(){
LINE_STATUS line_status = LINE_OK;
HTTP_CODE ret = NO_REQUEST;
char *text = 0;

while(((m_check_state == CHECK_STATE_CONTENT) && (line_status == LINE_OK)) || ((line_status = parse_line()) == LINE_OK)){
text = get_line();
m_start_line = m_checked_idx;
printf("got 1 http line: %s\n",text);

switch(m_check_state){
case CHECK_STATE_REQUESTLINE:{
ret = parse_request_line(text);
if(ret == BAD_REQUEST){
return BAD_REQUEST;
}
break;
}
case CHECK_STATE_HEADER:{
ret = parse_headers(text);
if(ret == BAD_REQUEST){
return BAD_REQUEST;
}
else if(ret == GET_REQUEST){
return do_request();
}
break;
}
case CHECK_STATE_CONTENT:{
ret = parse_content(text);
if(ret == GET_REQUEST){
return do_request();
}
line_status = LINE_OPEN;
break;
}
default:{
return INTERNAL_ERROR;
}
}
}
return NO_REQUEST;
}


//当得到一个完整、正确的HTTP请求时,我们就分析目标文件的属性。如果目标文件存在、对所有用户可读、且不是目录,则使用mmap将其映射到内存地址m_file_address处,并告诉调用者获取文件成功
http_conn::HTTP_CODE http_conn::do_request(){
strcpy(m_real_file,doc_root);
int len = strlen(doc_root);
strncpy(m_real_file + len,m_url,FILENAME_LEN - len - 1);
if(stat(m_real_file,&m_file_stat) < 0){
return NO_RESOURCE;
}
if(!(m_file_stat.st_mode & S_IROTH)){
return FORBIDDEN_REQUEST;
}
if(S_ISDIR(m_file_stat.st_mode)){
return BAD_REQUEST;
}
int fd = open(m_real_file,O_RDONLY);
m_file_address = (char *)mmap(0,m_file_stat.st_size,PROT_READ,MAP_PRIVATE,fd,0);
close(fd);
return FILE_REQUEST;
}

// 对内存映射区执行munmap操作
void http_conn::unmap(){
if(m_file_address){
munmap(m_file_address,m_file_stat.st_size);
m_file_address = 0;
}
}

// 写HTTP响应
bool http_conn::write(){
int temp = 0;
int bytes_have_send = 0;
int bytes_to_send = m_write_idx;
if(bytes_to_send == 0){
modfd(m_epollfd,m_sockfd,EPOLLIN);
init();
return true;
}
while(1){
temp = writev(m_sockfd,m_iv,m_iv_count);
if(temp <= -1){
if(errno == EAGAIN){
modfd(m_epollfd,m_sockfd,EPOLLOUT);
return true;
}
unmap();
return false;
}
bytes_to_send -= temp;
bytes_have_send += temp;
if(bytes_to_send <= bytes_have_send){
unmap();
if(m_linger){
init();
modfd(m_epollfd,m_sockfd,EPOLLIN);
return true;
}
else{
modfd(m_epollfd,m_sockfd,EPOLLIN);
return false;
}
}
}
}

// 向写缓冲区中写入待发送的数据
bool http_conn::add_response(const char *format,...){
if(m_write_idx >= WRITE_BUFFER_SIZE){
return false;
}
va_list arg_list;
va_start(arg_list,format);
int len = vsnprintf(m_write_buf + m_write_idx,WRITE_BUFFER_SIZE - 1 - m_write_idx,format,arg_list);
if(len >= (WRITE_BUFFER_SIZE - 1 - m_write_idx)){
return false;
}
m_write_idx += len;
va_end(arg_list);
return true;
}

// 添加状态行
bool http_conn::add_status_line(int status,const char *title){
return add_response("%s %d %s\r\n","HTTP/1.1",status,title);
}

// 添加消息报头
bool http_conn::add_headers(int content_length){
add_content_length(content_length);
add_linger();
add_blank_line();
}

// 添加Content-Length
bool http_conn::add_content_length(int content_length){
return add_response("Content-Length: %d\r\n",content_length);
}

// 添加Connection
bool http_conn::add_linger(){
return add_response("Connection: %s\r\n",(m_linger == true) ? "keep-alive" : "close");
}

// 添加空行
bool http_conn::add_blank_line(){
return add_response("%s","\r\n");
}

// 添加内容
bool http_conn::add_content(const char *content){
return add_response("%s",content);
}

// 根据服务器处理HTTP请求的结果,决定返回给客户端的内容
bool http_conn::process_write(HTTP_CODE ret){
switch(ret){
case INTERNAL_ERROR:{
add_status_line(500,error_500_title);
add_headers(strlen(error_500_form));
if(!add_content(error_500_form)){
return false;
}
break;
}
case BAD_REQUEST:{
add_status_line(400,error_400_title);
add_headers(strlen(error_400_form));
if(!add_content(error_400_form)){
return false;
}
break;
}
case NO_RESOURCE:{
add_status_line(404,error_404_title);
add_headers(strlen(error_404_form));
if(!add_content(error_404_form)){
return false;
}
break;
}
case FORBIDDEN_REQUEST:{
add_status_line(403,error_403_title);
add_headers(strlen(error_403_form));
if(!add_content(error_403_form)){
return false;
}
break;
}
case FILE_REQUEST:{
add_status_line(200,ok_200_title);
if(m_file_stat.st_size != 0){
add_headers(m_file_stat.st_size);
m_iv[0].iov_base = m_write_buf;
m_iv[0].iov_len = m_write_idx;
m_iv[1].iov_base = m_file_address;
m_iv[1].iov_len = m_file_stat.st_size;
m_iv_count = 2;
return true;
}
else{
const char *ok_string = "<html><body></body></html>";
add_headers(strlen(ok_string));
if(!add_content(ok_string)){
return false;
}
}
}
default:{
return false;
}
}
m_iv[0].iov_base = m_write_buf;
m_iv[0].iov_len = m_write_idx;
m_iv_count = 1;
return true;
}

//由线程池中的工作线程调用,这是处理HTTP请求的入口函数
void http_conn::process(){
HTTP_CODE read_ret = process_read();
if(read_ret == NO_REQUEST){
modfd(m_epollfd,m_sockfd,EPOLLIN);
return;
}
bool write_ret = process_write(read_ret);
if(!write_ret){
close_conn();
}
modfd(m_epollfd,m_sockfd,EPOLLOUT);
}

线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#ifndef THREADPOOL_H
#define THREADPOOL_H

#include<list>
#include<cstdio>
#include<exception>//异常处理
#include<pthread.h>//线程
#include"14_2_locker.h"//线程同步机制

template<typename T>
class threadpool{
private:
int m_thread_number;//线程池中的线程数
int m_max_requests;//请求队列中允许的最大请求数
pthread_t *m_threads;//描述线程池的数组,大小为m_thread_number
std::list<T *> m_workqueue;//请求队列
locker m_queuelocker;//保护请求队列的互斥锁
sem m_queuestat;//是否有任务需要处理
bool m_stop;//是否结束线程

public:
threadpool(int thread_number = 8, int max_requests = 10000);
~threadpool();
// 往请求队列中添加任务
bool append(T *request);

private:
static void *worker(void *arg);
void run();

};


template<typename T>
threadpool<T>::threadpool(int thread_number, int max_requests)
:m_thread_number(thread_number),m_max_requests(max_requests),m_stop(false),m_threads(NULL)
{
if((thread_number<=0) || (max_requests<=0)){
throw std::exception();
}

m_threads = new pthread_t[m_thread_number];
if(!m_threads){
throw std::exception();
}

// 创建thread_number个线程,并将它们设置为脱离线程
for(int i=0;i<thread_number;++i){
printf("create the %dth thread\n",i);
if(pthread_create(m_threads+i,NULL,worker,this) != 0){
delete [] m_threads;
throw std::exception();
}
if(pthread_detach(m_threads[i])){
delete [] m_threads;
throw std::exception();
}
}
}

template<typename T>
threadpool<T>::~threadpool(){
delete [] m_threads;
m_stop = true;
}

template<typename T>
bool threadpool<T>::append(T *request){
m_queuelocker.lock();
if(m_workqueue.size() > m_max_requests){
m_queuelocker.unlock();
return false;
}
m_workqueue.push_back(request);
m_queuelocker.unlock();
m_queuestat.post();
return true;
}

template<typename T>
void *threadpool<T>::worker(void *arg){
threadpool *pool = (threadpool *)arg;
pool->run();
return pool;
}

template<typename T>
void threadpool<T>::run(){
while(!m_stop){
m_queuestat.wait(); // 等待有任务的到来,信号量控制阻塞
m_queuelocker.lock(); // 上锁,确保对队列的访问是线程安全的
if (m_workqueue.empty()) { // 如果任务队列为空,跳过
m_queuelocker.unlock();
continue;
}
T* request = m_workqueue.front(); // 从队列中取任务
m_workqueue.pop_front(); // 弹出队列头部任务
m_queuelocker.unlock(); // 解锁队列,其他线程可以继续操作队列
if(!request){
continue;
}
request->process();
}
}

#endif

互斥锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

class locker{
private:
pthread_mutex_t m_mutex;
public:
locker(){
if(pthread_mutex_init(&m_mutex,NULL) != 0) throw std::exception();
}
~locker(){
pthread_mutex_destroy(&m_mutex);
}
bool lock(){
return pthread_mutex_lock(&m_mutex) == 0;
}
bool unlock(){
return pthread_mutex_unlock(&m_mutex) == 0;
}
};

信号量

1
2
3
4
5
6
7
8
9
10
11
class sem{
private:
sem_t m_sem;
public:
sem(){
if(sem_init(&m_sem,0,0)!=0) throw std::exception();
}
~sem(){ sem_destroy(&m_sem); }
bool wait(){ return sem_wait(&m_sem) == 0; }
bool post(){ return sem_post(&m_sem) == 0; }
};

参考文章

  1. 100%弄明白5种IO模型 - 知乎

  2. 9.1 什么是零拷贝? | 小林coding

  3. linux高性能服务器编程-游双


© 2024 oymaster 使用 Stellar 创建

总访问量