I/O多路复用 - select、poll、epoll讲解(epoll工作图解介绍+红黑树)_epoll 红黑树-程序员宅基地

技术标签: c语言  运维  linux  服务器  unix  UNIX环境高级编程  


当有多个多种事件同一段时间发生时,多进程/多线程模式可以解决,但是创建进程和创建线程都是需要时间开销的。在编写服务器客户端程序的时候,如果服务器性能一般,但是客户端连接的又太多的时候,这会造成很大的代价。

该篇要说的多路复用就是解决这中问题的办法之一。通过多路复用来监听,是否有事件发生,并把发生的事件是什么告诉服务器端,然后执行相关操作。这种方法叫做多路复用。

多路复用有三种,下面一一介绍:select、poll以及epoll多路复用。


1- select多路复用

(1)工作原理

我们先讲解select的工作原理,然后再用代码讲解进行实践。

可以理解为select监视并等待多个文件描述符的属性发生变化,它监视的属性分3类,分别是read-fds(文件描述符有数据到来可读)、write_fds(文件描述符可写)、和except_fds(文件描述符异常)。调用后select函数会阻塞,直到有描述符就绪(有数据可读、可写、或者有错误异常),或者超时( timeout 指定等待时间)发生函数才返回。当select()函数返回后,可以通过遍历 fdset,来找到究竟是哪些文件描述符就绪。

函数原型:

#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int select(int nfds, fd_set *read_fds, fd_set *write_fds, fd_set *except_fds, struct timeval *timeout);
  • nfds:
    指待测试的fd的总个数,它的值是待测试的最大文件描述符加1。Linux内核从0开始到max_fd-1扫描文件描述符,如果有数据出现事件(读、写、异常)将会返回;假设需要监测的文件描述符是8,9,10,那么Linux内核实际也要监测0-7,此时真正带测试的文件描述符是0-10总共11个,即max(8,9,10)+1,所以第一个参数是所有要监听的文件描述符中最大的+1;
  • read_fds、write_fds、except_fds:
    中间三个参数read_fds、write_fds和except_fds指定要让内核测试读、写和异常条件的fd集合,如果不需要测试的可以设置为NULL;
  • timeout:
    最后一个参数是设置select的超时时间,如果设置为NULL则永不超时。timeval结构体: struct timeval { long tv_sec; /* seconds 秒*/ long tv_usec; /* microseconds 微妙*/ };

操作文件描述符的几个函数:

在这里插入图片描述

注意:

在Linux内核有个参数__FD_SETSIZE定义了每个FD_SET的句柄个数中,这也意味着select所用到的FD_SET是有限的,也正是这个原因select()默认只能同时处理1024个客户端的连接请求: /linux/posix_types.h: #define __FD_SETSIZE 1024

(2)select-服务器编程

socket_server_socket.c服务器代码:

#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#include<unistd.h>
#include<errno.h>
#include<ctype.h>
#include<time.h>
#include<pthread.h>
#include<getopt.h>
#include<libgen.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>

#define  ARRAY_SIZE(x)      ( sizeof(x)/sizeof(x[0]) )//数组的大小/第一个元素的大小=元素的个数

static inline void msleep(unsigned long ms);
static inline void print_usage(char *programname);
int socket_server_init( char *listen_ip, int listen_port);

int main(int argc, char *argv[])
{
    
	char     *programname = NULL;
	int        server_port = 0;
	int        daemon_run = 0;
	int        fds_array[1024];
	fd_set   rdset;
	int        maxfd = 0;
	int        found;
	int        listenfd,connfd;
	int        opt;
	int        rv;
	int        i,j;
	char     buf[1024];

	struct option long_option[] =
	{
    
		{
    "daemon", no_argument, NULL, 'b'},
		{
    "port", required_argument, NULL, 'p'},
		{
    "help", no_argument, NULL, 'h'},
		{
    NULL, 0, NULL, 0}
	};

	programname = basename(argv[0]);

	while( (opt = getopt_long(argc, argv, "bp:h", long_option, NULL))  != -1)
	{
    
		switch(opt)
		{
    
			case 'b':
				daemon_run = 1;
				break;

			case 'p':
				server_port = atoi(optarg);
				break;

			case 'h':
				print_usage(programname);
				return EXIT_SUCCESS;//EXIT_SUCCESS- successful execution of a
				//program (程序的成功执行)   EXIT_FAILUEEunsuccessful execution of a program (程序的不成功执行) 

			default:
					break;//default只有在case匹配失败的时候才会执行,位置没关系
		}
	}

	if( !server_port )
	{
    
		print_usage(programname);
		return -1;
	}

	if( (listenfd = socket_server_init(NULL, server_port)) < 0)
	{
    
		printf("ERROR: %s server listen on port %d failure\n", argv[0], server_port);
		return -2;
	}
	printf("%s server start to listen on port %d\n", programname, server_port);

	if(daemon_run)
	{
    
		daemon(0, 0);
	}

	for( i = 0; i < ARRAY_SIZE(fds_array); i++)
	{
    
		fds_array[i] = -1;//将数组里面的内容都设为-1,为什么不设为0,因为文件描述符也可能为0
	}
	fds_array[0] = listenfd;//刚开始只有fds_array【0】有内容listenfd
	
	for( ; ; )
	{
    
		FD_ZERO(&rdset);//清空rdset集合中的值
		for( i = 0; i < ARRAY_SIZE(fds_array); i++)
		{
    
			if(fds_array[i] < 0)//初始值为-1,<0表示还未被占用
			{
    
				continue;
			}
			maxfd = fds_array[i] >maxfd ? fds_array[i] : maxfd;//三目运算符
			FD_SET(fds_array[i], &rdset);//将数组中的描述符全部放到rdset集合中去,第一次将listenfd放到集合中去
		}

		rv = select(maxfd+1, &rdset, NULL, NULL, NULL);//不关心写,异常和超时;
		if(rv < 0)
		{
    
			printf("select get timeout");
			continue;
		}

		if( FD_ISSET(listenfd, &rdset) )//判断集合中是否有listenfd
		{
    
			if( (connfd = accept( listenfd, (struct sockaddr*)NULL, NULL)) < 0)
			{
    
				printf("accept new client failure:%s\n", strerror(errno));
				continue;
			}
			found = 0;
			for(i=0; i<ARRAY_SIZE(fds_array); i++)
			{
    
				if(fds_array[i] < 0)
				{
    
					printf("accept new client[%d] and add it into array\n", connfd);
					fds_array[i] = connfd;
					found =1;
					break;
				}
			}

			if( !found )
			{
    
				printf("accept new client[%d] but full, so refuse it\n", connfd);
				close(connfd);
			}
		}

		else/*data arrive from already connected client*/
		{
    
			for(i=0; i<ARRAY_SIZE(fds_array); i++)
			{
    
				if(fds_array[i]<0 || !FD_ISSET(fds_array[i], &rdset) )
				{
    
					continue;
				}
                memset(buf, 0, sizeof(buf));
				if( (rv = read(fds_array[i], buf, sizeof(buf))) <= 0 )
				{
    
					printf("socket[%d] read failure or get disconnect.\n", fds_array[i]);
					close(fds_array[i]);
					fds_array[i] = -1;
				}
				else
				{
    
					printf("socket[%d] read get %d bytes data:%s\n", fds_array[i], rv, buf);
					for(j=0; j<rv; j++)
					{
    
						buf[j] = toupper(buf[j]);
					}

					if( (write(fds_array[i], buf, rv)) < 0)
					{
    
						printf("socket[%d] write failure: %s\n", fds_array[i], strerror(errno));
						close(fds_array[i]);
						fds_array[i] = -1;
					}
				}
			}
		}
	}
Cleanup:
	close(listenfd);
	return 0;
}

static inline void print_usage(char *programname)//inline函数也称为内联函数或内嵌函数,_inline定义的类的内联函数,函数代码被放入符号调用表,使用时直接展开,不需要调用,即在编译期间将所调用的函数的代码直接嵌入到主调函数中,是一种以空间换时间的函数。如果不加static,则表示该函数有可能会被其他编译单元所调用,所以一定会产生函数本身的代码。所以加了static,一般可令可执行文件变小。
{
    
	printf("Usage: %s [OPTION]...\n", programname);
	printf("%s is a socket server program, which used to verify client and echo back string from it\n", programname);
	printf(" -b[daemon] set program running on background");
	printf(" -p[port] socket server port address\n");
	printf(" -h[help] Display this help information\n");

	printf("\nExample: %s -b -p 8900\n", programname);
		return ;
}

int socket_server_init( char *listen_ip, int listen_port)
{
    
	struct sockaddr_in     serveraddr;
	int      listenfd;
	int      rv = 0;
	int      on = 1;

	if( (listenfd = socket (AF_INET, SOCK_STREAM, 0))< 0)
	{
    
		printf("Use socket() to create a TCP socket failure: %s\n", strerror(errno));
		return -1;
	}
	setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));

	memset(&serveraddr, 0, sizeof(serveraddr));
	serveraddr.sin_family = AF_INET;
	serveraddr.sin_port = htons(listen_port);//主机字节序转换为网络字节序,s——short

	if( !listen_ip )
	{
    
		serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);//监听所有的IP
	}
	else
	{
    
		if( inet_pton(AF_INET, listen_ip, &serveraddr.sin_addr) <= 0 )//inet_pton函数原型如下[将"点分十进制" -> "整数"]
		{
    
			printf("inet_pton() set listen IP address failure.\n");
			rv = -2;
			goto Cleanup;
		}
	}

	if(bind(listenfd, (struct sockaddr *) &serveraddr, sizeof(serveraddr)) < 0 )
	{
    
		printf("Use bind() to bind the TCP socket failure: %s\n", strerror(errno));
		rv = -3;
		goto Cleanup;
	}

	if(listen(listenfd, 13) < 0)
	{
    
		printf("Use bind() to bind the TCP socket failure:%s\n", strerror(errno));
		rv = -4;
		goto Cleanup;
	}
Cleanup:
	if(rv < 0)
	{
    
		close(listenfd);
	}
	else
	{
    
		rv = listenfd;
	}
	return rv;
}

static inline void msleep(unsigned long ms)
{
    
	struct timeval   tv;
	tv.tv_sec = ms/1000;
	tv.tv_usec = (ms%1000)*1000;

	select(0, NULL, NULL, NULL, &tv);
}

通过tcp-test-tools软件进行连接,两个客户端都可连接并且发送消息不冲突。

 ./socket_server_select -p 12345
 
socket_server_select server start to listen on port 12345
accept new client[4] and add it into array
accept new client[5] and add it into array
socket[4] read get 12 bytes data:I am 4 hello
socket[5] read get 12 bytes data:I am 5 hello

在这里插入图片描述


2- poll多路复用

(1)工作原理

我们先讲解poll的工作原理,然后再用代码讲解进行实践。

poll()的机制与 select() 类似,与 select() 在本质上没有多大差别,管理多个描述符也是进行轮询,根据描述符的状态进行处理,但是 poll() 没有最大文件描述符数量的限制(但是数量过大后性能也是会下降)。poll() 和 select() 同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

函数原型:

#include <poll.h>
struct pollfd{
    
 int fd; /* 文件描述符 */
 short events; /* 等待的事件 */
 short revents; /* 实际发生了的事件 */
} ;
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
  • nfds:
    指定数组中监听的元素个数;
  • timeout:
    指定等待的毫秒数,无论I/O是否准备好,poll都会返回。timeout指定为负数值表示无限超时,使poll()一直挂起直到一个指定事件发生;timeout为0指示poll调用立即返回并列出准备好I/O的文件描述符,但并不等待其它的事件。
  • fds:
    用来指向一个struct pollfd类型的数组,每一个pollfd结构体指定了一个被监视的文件描述符,指示poll()监视多个文件描述符。每个结构体的events域是监视该文件描述符的事件掩码,由用户来设置这个域。revents域是文件描述符的操作结果事件掩码,内核在调用返回时设置这个域,events域中请求的任何事件都可能在revents域中返回。下表列出指定 events 标志以及测试 revents 标志的一些常值:

在这里插入图片描述
返回值注意事项:
该函数成功调用时,poll()返回结构体中revents域不为0的文件描述符个数;如果在超时前没有任何事件发生,poll()返回0;失败时,poll()返回-1,并设置errno为下列值之一:

  • EBADF:一个或多个结构体中指定的文件描述符无效。
  • EFAULTfds:指针指向的地址超出进程的地址空间。
  • EINTR:请求的事件之前产生一个信号,调用可以重新发起。
  • EINVALnfds:参数超出PLIMIT_NOFILE值。
  • ENOMEM:可用内存不足,无法完成请求。

(2)poll-服务器编程

socket_server_poll.c服务器代码:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <ctype.h>
#include <time.h>
#include <pthread.h>
#include <getopt.h>
#include <libgen.h>
#include <sys/types.h> 
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <poll.h>

#define ARRAY_SIZE(x) (sizeof(x)/sizeof(x[0]))

static inline void print_usage(char *progname);
int socket_server_init(char *listen_ip, int listen_port);

int main(int argc, char **argv)
{
    
    int listenfd, connfd;
    int serv_port = 0;
    int daemon_run = 0;
    char *progname = NULL;
    int opt;
    int rv;
    int i, j;
    int found;
    int max;
    char buf[1024];
    struct pollfd fds_array[1024];
    struct option long_options[] =
    {
     
        {
    "daemon", no_argument, NULL, 'b'},
        {
    "port", required_argument, NULL, 'p'},
        {
    "help", no_argument, NULL, 'h'},
        {
    NULL, 0, NULL, 0}
    }; 
    progname = basename(argv[0]);
    /* Parser the command line parameters */
    while ((opt = getopt_long(argc, argv, "bp:h", long_options, NULL)) != -1)
    {
     
        switch (opt)
        {
     
            case 'b':
                daemon_run=1;
                break;
            case 'p':
                serv_port = atoi(optarg);
                break;
            case 'h': /* Get help information */
                print_usage(progname);
                return EXIT_SUCCESS;
            default:
                break;
        } 
    } 
    if( !serv_port )
    {
     
        print_usage(progname);
        return -1;
    }
    if( (listenfd=socket_server_init(NULL, serv_port)) < 0 )
    {
    
        printf("ERROR: %s server listen on port %d failure\n", argv[0],serv_port);
        return -2;
    }
    printf("%s server start to listen on port %d\n", argv[0],serv_port);
    /* set program running on background */
    if( daemon_run )
    {
    
        daemon(0, 0);
    }
    for(i=0; i<ARRAY_SIZE(fds_array) ; i++)
    {
    
        fds_array[i].fd=-1;
    }
    fds_array[0].fd = listenfd;
    fds_array[0].events = POLLIN;
    max = 0;
    for ( ; ; )
    {
    
        /* program will blocked here */
        rv = poll(fds_array, max+1, -1);
        if(rv < 0)
        {
    
            printf("select failure: %s\n", strerror(errno));
            break;
        }
        else if(rv == 0)
        {
    
            printf("select get timeout\n");
            continue;
        }
        /* listen socket get event means new client start connect now */
        if (fds_array[0].revents & POLLIN)
        {
    
            if( (connfd=accept(listenfd, (struct sockaddr *)NULL, NULL)) < 0)
            {
    
                printf("accept new client failure: %s\n", strerror(errno));
                continue;
            }
            found = 0;
            for(i=1; i<ARRAY_SIZE(fds_array) ; i++)
            {
    
                if( fds_array[i].fd < 0 )
                {
    
                    printf("accept new client[%d] and add it into array\n", connfd );
                    fds_array[i].fd = connfd;
                    fds_array[i].events = POLLIN;
                    found = 1;
                    break;
                }
            }
            if( !found )
            {
    
                printf("accept new client[%d] but full, so refuse it\n", connfd);
                close(connfd);
                continue;
            }

            max = i>max ? i : max;
            if (--rv <= 0)
                continue;
        }
        else /* data arrive from already connected client */
        {
    
            for(i=1; i<ARRAY_SIZE(fds_array); i++)
            {
    
                memset(buf, 0, sizeof(buf));
                if( fds_array[i].fd < 0 )
                    continue;
                if( (rv=read(fds_array[i].fd, buf, sizeof(buf))) <= 0)
                {
    
                    printf("socket[%d] read failure or get disconncet.\n", fds_array[i].fd);
                    close(fds_array[i].fd);
                    fds_array[i].fd = -1;
                }
                else
                {
    
                    printf("socket[%d] read get %d bytes data: %s\n", fds_array[i].fd, rv, buf);
                    /* convert letter from lowercase to uppercase */
                    for(j=0; j<rv; j++)
                        buf[j]=toupper(buf[j]);
                    if( write(fds_array[i].fd, buf, rv) < 0 )
                    {
    
                        printf("socket[%d] write failure: %s\n", fds_array[i].fd, strerror(errno));
                        close(fds_array[i].fd);
                        fds_array[i].fd = -1;
                    }
                }
            }
        }
    }
CleanUp:
    close(listenfd);
    return 0;
}
static inline void print_usage(char *progname)
{
    
    printf("Usage: %s [OPTION]...\n", progname);

    printf(" %s is a socket server program, which used to verify client and echo back string from it\n",
            progname);
    printf("\nMandatory arguments to long options are mandatory for short options too:\n");

    printf(" -b[daemon ] set program running on background\n");
    printf(" -p[port ] Socket server port address\n");
    printf(" -h[help ] Display this help information\n");

    printf("\nExample: %s -b -p 8900\n", progname);
    return ;
}
int socket_server_init(char *listen_ip, int listen_port)
{
    
    struct sockaddr_in servaddr;
    int rv = 0;
    int on = 1;
    int listenfd;
    if ( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
    {
    
        printf("Use socket() to create a TCP socket failure: %s\n", strerror(errno));
        return -1;
    }
    /* Set socket port reuseable, fix 'Address already in use' bug when socket server restart */
    setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET; 
    servaddr.sin_port = htons(listen_port);
    if( !listen_ip ) /* Listen all the local IP address */
    {
    
        servaddr.sin_addr.s_addr = htonl(INADDR_ANY); 
    }
    else /* listen the specified IP address */
    {
    
        if (inet_pton(AF_INET, listen_ip, &servaddr.sin_addr) <= 0)
        {
    
            printf("inet_pton() set listen IP address failure.\n");
            rv = -2;
            goto CleanUp;
        }
    }
    if(bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr)) < 0)
    {
    
        printf("Use bind() to bind the TCP socket failure: %s\n", strerror(errno));
        rv = -3;
        goto CleanUp;
    }
    if(listen(listenfd, 13) < 0)
    {
    
        printf("Use bind() to bind the TCP socket failure: %s\n", strerror(errno));
        rv = -4;
        goto CleanUp;
    }
CleanUp:
    if(rv<0)
        close(listenfd);
    else
        rv = listenfd;
    return rv;
}

通过tcp-test-tools软件进行连接,两个客户端都可连接并且发送消息不冲突。

./socket_server_poll -p 12345

./socket_server_poll server start to listen on port 12345
accept new client[4] and add it into array
accept new client[5] and add it into array
socket[4] read get 12 bytes data: I am 4 hello
socket[5] read get 12 bytes data: I am 5 hello
socket[4] read get 18 bytes data: I am 4 hello hello
socket[5] read get 18 bytes data: I am 5 hello hello

在这里插入图片描述


3- epoll多路复用

(1)工作原理

我们先讲解epoll的工作原理,然后再用代码讲解进行实践。

在linux 没有实现epoll事件驱动机制之前,我们一般选择用select或者poll等IO多路复用的方法来实现并发服务程序。自Linux 2.6内核正式引入epoll以来,epoll已经成为了目前实现高性能网络服务器的必备技术,在大数据、高并发、集群等一些名词唱得火热之年代,select和poll的用武之地越来越有限,风头已经被epoll占尽。

epoll的设计和实现与select完全不同。epoll通过在Linux内核中申请一个简易的文件系统,把原先的select/poll调用分成了3个部分(三个函数):
创建epoll实例:epoll_create()

#include <sys/epoll.h>
int epoll_create(int size);

系统调用epoll_create()创建了一个新的epoll实例,其对应的兴趣列表初始化为空。若成功返回文件描述符,若出错返回-1。参数size指定了我们想要通过epoll实例来检查的文件描述符个数。该参数并不是一个上限,而是告诉内核应该如何为内部数据结构划分初始大小。
作为函数返回值,epoll_create()返回了代表新创建的epoll实例的文件描述符。这个文件描述符在其他几个epoll系统调用中用
来表示epoll实例。当这个文件描述符不再需要时,应该通过close()来关闭。

修改epoll的兴趣列表:epoll_ctl()

#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev);

系统调用epoll_ctl()能够修改由文件描述符epfd所代表的epoll实例中的兴趣列表。若成功返回0,若出错返回-1。

  • epfd:
    第一个参数epfd是epoll_create()的返回值;

  • op:
    第二个参数op用来指定需要执行的操作,它可以是如下几种值:
    在这里插入图片描述

  • fd:
    第三个参数fd指明了要修改兴趣列表中的哪一个文件描述符的设定。

  • ev:
    第四个参数ev是指向结构体epoll_event的指针,结构体的定义如下:

typedef union epoll_data{
    
 void *ptr; /* Pointer to user-defind data */
 int fd; /* File descriptor */
 uint32_t u32; /* 32-bit integer */
 uint64_t u64; /* 64-bit integer */
} epoll_data_t;

struct epoll_event{
    
 uint32_t events; /* epoll events(bit mask) */
 epoll_data_t data; /* User data */
};

事件等待:epoll_wait()

#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *evlist, int maxevents, int timeout);

系统调用epoll_wait()返回epoll实例中处于就绪态的文件描述符信息,单个epoll_wait()调用能够返回多个就绪态文件描述符的信息。调用成功后epoll_wait()返回数组evlist中的元素个数,如果在timeout超时间隔内没有任何文件描述符处于就绪态的话就返回0,出错时返回-1并在errno中设定错误码以表示错误原因。

  • epfd:
    第一个参数epfd是epoll_create()的返回值;

  • evlist:
    第二个参数evlist所指向的结构体数组中返回的是有关就绪态文件描述符的信息,数组evlist的空间由调用者负责申请;

  • maxevents:
    第三个参数maxevents指定所evlist数组里包含的元素个数;

  • timeout
    第四个参数timeout用来确定epoll_wait()的阻塞行为,有如下几种:

    timeout = -1:调用将一直阻塞,直到兴趣列表中的文件描述符上有事件产生或者直到捕获到一个信号为止。
    timeout =  0:执行一次非阻塞式地检查,看兴趣列表中的描述符上产生了哪个事件。
    timeout >  0:调用将阻塞至多timeout毫秒,直到文件描述符上有事件发生,或者直到捕获到一个信号为止。  
    

(2)select-服务器编程

socket_server_epoll.c服务器代码:

#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#include<unistd.h>
#include<errno.h>
#include<ctype.h>
#include<time.h>
#include<pthread.h>
#include<getopt.h>
#include<libgen.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<sys/epoll.h>
#include<sys/resource.h>

#define MAX_EVENTS         512
#define  ARRAY_SIZE(x)      ( sizeof(x)/sizeof(x[0]) )//数组的大小/第一个元素的大小=元素的个数

static inline void print_usage(char *programname);
int socket_server_init( char *listen_ip, int listen_port);
void set_socket_rlimit(void);

int main(int argc, char *argv[])
{
    
	char     *programname = NULL;
	int        server_port = 0;
	int        daemon_run = 0;
	int        found;
	int        listenfd,connfd;
	int        opt;
	int        rv;
	int        i,j;
	char     buf[1024];

	int       epollfd;
	struct  epoll_event    event;
	struct  epoll_event    event_array[MAX_EVENTS];
	int       events;

	struct option long_option[] =
	{
    
		{
    "daemon", no_argument, NULL, 'b'},
		{
    "port", required_argument, NULL, 'p'},
		{
    "help", no_argument, NULL, 'h'},
		{
    NULL, 0, NULL, 0}
	};

	programname = basename(argv[0]);

	while( (opt = getopt_long(argc, argv, "bp:h", long_option, NULL))  != -1)
	{
    
		switch(opt)
		{
    
			case 'b':
				daemon_run = 1;
				break;

			case 'p':
				server_port = atoi(optarg);
				break;

			case 'h':
				print_usage(programname);
				return EXIT_SUCCESS;//EXIT_SUCCESS  successful execution of a
						    //program (程序的成功执行)   EXIT_FAILUEE  unsuccessful execution of a program (程序的不成功执行) 

			default:
				break;//default只有在case匹配失败的时候才会执行,位置没关系
		}
	}

	if( !server_port )
	{
    
		print_usage(programname);
		return -1;
	}

	set_socket_rlimit();

	if( (listenfd = socket_server_init(NULL, server_port)) < 0)
	{
    
		printf("ERROR: %s server listen on port %d failure\n", argv[0], server_port);
		return -2;
	}
	printf("%s server start to listen on port %d\n", argv[0], server_port);

	if(daemon_run < 0)
	{
    
		daemon(0, 0);
	}

	if( (epollfd = epoll_create(MAX_EVENTS)) < 0 )
			/*listen socket get event means new client start connect now*/

	{
    
		printf("epoll_create() failure: %s\n", strerror(errno));
		return -3;
	}
	event.events = EPOLLIN;
	event.data.fd = listenfd;

	if( epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &event) < 0 )
	{
    
		printf("epoll:%d,listenfd:%d\n", epollfd, listenfd);
		printf("epoll add listen socket failure: %s\n", strerror(errno));
		return -4;
	}

	for( ; ; )
	{
    
		/*program will broked here*/
		events = epoll_wait( epollfd, event_array, MAX_EVENTS, -1);
		if( events < 0 )
		{
    
			printf("epoll failure: %s\n", strerror(errno));
			break;
		}
		else if( events == 0)
		{
    
			printf("epoll get timeout\n");
			continue;
		}

		for( i=0; i<events; i++)
		{
    
			if( (event_array[i].events&EPOLLERR) || (event_array[i].events&EPOLLHUP) )
			{
    
				printf("epoll_wait get error on fd[%d]: %s\n", event_array[i].data.fd, strerror(errno) );
				epoll_ctl(epollfd, EPOLL_CTL_DEL, event_array[i].data.fd, NULL);
				close(event_array[i].data.fd);
			}

			/*listen socket get event means new client start connect now*/
			if( event_array[i].data.fd == listenfd )
			{
    
				if( (connfd=accept(listenfd, (struct sockaddr *)NULL, NULL ) )< 0 )
				{
    
					printf("accept new client failure:%s\n", strerror(errno) );
					continue;
				}
				event.data.fd = connfd;
				event.events = EPOLLIN;
				if( epoll_ctl( epollfd, EPOLL_CTL_ADD, connfd, &event) < 0 )
				{
    
					printf("epoll add client socket failure: %s\n", strerror(errno) );
					close(event_array[i].data.fd);
					continue;
				}
				printf("epoll add new client socket[%d] ok.\n",connfd);
			}

			else
			{
    
                memset(buf, 0, sizeof(buf));
				if( (rv = read(event_array[i].data.fd, buf, sizeof(buf))) < 0 )
				{
    
					printf("socket[%d] read failure or get disconnected and will be removed.\n", event_array[i].data.fd);
					epoll_ctl(epollfd, EPOLL_CTL_DEL, event_array[i].data.fd, NULL);
					close(event_array[i].data.fd);
					continue;
				}

				else
				{
    
					printf("socket[%d] read get %d bytes data: %s\n", event_array[i].data.fd, rv, buf);

					for(j=0; j<rv; j++)
					{
    
						buf[j] = toupper(buf[j]);
					}
					if(write(event_array[i].data.fd, buf, rv) < 0)
					{
    
						printf("socket[%d] write failure: %s\n", event_array[i].data.fd, strerror(errno) );
						epoll_ctl(epollfd, EPOLL_CTL_DEL, event_array[i].data.fd, NULL);
						close(event_array[i].data.fd);
					}
				}
			}
		}
	}	
CleanUp:
	close(listenfd);
	return 0;
}

static inline void print_usage(char *programname)
{
    
	printf("Usage: %s [OPTION]...\n", programname);
	printf("%s is a socket server program, which used to verify client and echo back string from it\n", programname);
	printf(" -b[daemon] set program running on background");
	printf(" -p[port] socket server port address\n");
	printf(" -h[help] Display this help information\n");

	printf("\nExample: %s -b -p 8900\n", programname);
	return ;
}

int socket_server_init( char *listen_ip, int listen_port)
{
    
	struct sockaddr_in     serveraddr;
	int      listenfd;
	int      rv = 0;
	int      on = 1;

	if( (listenfd = socket (AF_INET, SOCK_STREAM, 0))< 0)
	{
    
		printf("Use socket() to create a TCP socket failure: %s\n", strerror(errno));
		return -1;
	}
	setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));

	memset(&serveraddr, 0, sizeof(serveraddr));
	serveraddr.sin_family = AF_INET;
	serveraddr.sin_port = htons(listen_port);//主机字节序转换为网络字节序,s——short

	if( !listen_ip )
	{
    
		serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);//监听所有的IP
	}
	else
	{
    
		if( inet_pton(AF_INET, listen_ip, &serveraddr.sin_addr) <= 0 )//inet_pton函数原型如下[将"点分十进制" -> "整数"]
		{
    
			printf("inet_pton() set listen IP address failure.\n");
			rv = -2;
			goto Cleanup;
		}
	}

	if(bind(listenfd, (struct sockaddr *) &serveraddr, sizeof(serveraddr)) < 0 )
	{
    
		printf("Use bind() to bind the TCP socket failure: %s\n", strerror(errno));
		rv = -3;
		goto Cleanup;
	}

	if(listen(listenfd, 13) < 0)
	{
    
		printf("Use bind() to bind the TCP socket failure:%s\n", strerror(errno));
		rv = -4;
		goto Cleanup;
	}

Cleanup:
	if(rv < 0)
	{
    
		close(listenfd);
	}
	else
	{
    
		rv = listenfd;
	}
	return rv;
}

void set_socket_rlimit(void)
{
    
	struct rlimit limit = {
    0};

	getrlimit(RLIMIT_NOFILE, &limit);
	limit.rlim_cur = limit.rlim_max;
	setrlimit(RLIMIT_NOFILE, &limit);

	printf("set socket open fd max count to %ld\n", limit.rlim_max);
}

通过tcp-test-tools软件进行连接,两个客户端都可连接并且发送消息不冲突。

./socket_server_epoll -p 12345

set socket open fd max count to 1048576
./socket_server_epoll server start to listen on port 12345
epoll add new client socket[5] ok.
socket[5] read get 24 bytes data: I am 5 hello hello hello
epoll add new client socket[6] ok.
socket[6] read get 24 bytes data: I am 6 hello hello hello

在这里插入图片描述


4- epoll工作原理底层介绍

(1)工作机制

epoll的实现机制与select/poll机制完全不同。

epoll是Linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。

epoll除了提供select/poll那种IO事件的水平触发(Level Triggered)外,还提供了边缘触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。

  • LT(level triggered)水平触发:是缺省的工作方式。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。
  • ET (edge-triggered)边缘触发:是高速工作方式。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)。

设想一下如下场景:有100万个客户端同时与一个服务器进程保持着TCP连接。而每一时刻,通常只有几百上千个TCP连接是活跃的(事实上大部分场景都是这种情况)。如何实现这样的高并发?在select/poll时代,服务器进程每次都把这100万个连接告诉操作系统(从用户态复制句柄数据结构到内核态),让操作系统内核去查询这些套接字上是否有事件发生,轮询完后,再将句柄数据复制到用户态,让服务器应用程序轮询处理已发生的网络事件,这一过程资源消耗较大,因此,select/poll一般只能处理几千的并发连接。

(2)epoll工作图解

在这里插入图片描述

  1. epollfd = epoll_create()内核创建epoll实例(创建红黑树Red_Tree和就绪链表Read_List);
  2. epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &event)对红黑树进行操作,添加listenfd节点;
  3. events = epoll_wait( epollfd, event_array, MAX_EVENTS, -1)内核查找红黑树中就绪的socket,放入就绪列表;然后就绪列表将类容复制到struct epoll_event List数组中;
  4. 然后对数组中需要处理的socket直接进行判断读写操作。

至于红黑树具体是什么,怎么实现的,等我弄懂了再分享…好吧还是去看看吧,但是增删改查太麻烦了,有点难受,先理解一下浅层面的吧。

真心想了解的可以参考这篇文章:【数据结构】史上最好理解的红黑树讲解,让你彻底搞懂红黑树

(3)红黑树浅层理解

我们一步一步探索了解一下红黑树…

二叉搜索树
一棵空树或者满足以下性质的二叉树被称之为二叉搜索树:

  • 如果左子树不为空,则左子树所有结点值都小于根结点的值
  • 如果右子树不为空,则右子树所有结点值都大于或等于根结点的值
  • 任意一棵子树也是一棵二叉搜索树

在这里插入图片描述
但是极端情况下的搜索效率很低,于是就有了平衡二叉树(AVL数)

平衡二叉树(AVL数)
平衡树:任意节点的子树的高度差都小于等于1;

平衡二叉树:满足二叉树以及平衡树的特点,就是两者的结合体嘛。

平衡二叉树可以有效的减少二叉树的深度,从而提高了查询。但是效率严格平衡,代价高,还是不推荐。

红黑树(Red Black Tree R-B Tree)
一种特化的平衡二叉树(AVL树),在插入和删除时通过特定操作保持二叉查找树的相对平衡,从而获得较高的查找性能。
在这里插入图片描述
符合二叉树的基本特征,同时还具备以下特点:

  • 节点非黑即红 (每个节点要么是黑色,要么是红色)
  • 其根节点是黑色
  • 叶子节点是黑色(为了简单期间,一般会省略该节点)
  • 相邻节点不同为红色(红色节点的子节点必是黑色)
  • 从一个节点到该节点的下叶子节点的所有路径上包含的黑节点数量相等(这一点是平衡的关键)

记忆方法:黑根黑叶红不邻,同祖等高只数黑


版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/m0_51429770/article/details/129364697

智能推荐

18个顶级人工智能平台-程序员宅基地

文章浏览阅读1w次,点赞2次,收藏27次。来源:机器人小妹  很多时候企业拥有重复,乏味且困难的工作流程,这些流程往往会减慢生产速度并增加运营成本。为了降低生产成本,企业别无选择,只能自动化某些功能以降低生产成本。  通过数字化..._人工智能平台

electron热加载_electron-reloader-程序员宅基地

文章浏览阅读2.2k次。热加载能够在每次保存修改的代码后自动刷新 electron 应用界面,而不必每次去手动操作重新运行,这极大的提升了开发效率。安装 electron 热加载插件热加载虽然很方便,但是不是每个 electron 项目必须的,所以想要舒服的开发 electron 就只能给 electron 项目单独的安装热加载插件[electron-reloader]:// 在项目的根目录下安装 electron-reloader,国内建议使用 cnpm 代替 npmnpm install electron-relo._electron-reloader

android 11.0 去掉recovery模式UI页面的选项_android recovery 删除 部分菜单-程序员宅基地

文章浏览阅读942次。在11.0 进行定制化开发,会根据需要去掉recovery模式的一些选项 就是在device.cpp去掉一些选项就可以了。_android recovery 删除 部分菜单

mnn linux编译_mnn 编译linux-程序员宅基地

文章浏览阅读3.7k次。https://www.yuque.com/mnn/cn/cvrt_linux_mac基础依赖这些依赖是无关编译选项的基础编译依赖• cmake(3.10 以上)• protobuf (3.0 以上)• 指protobuf库以及protobuf编译器。版本号使用 protoc --version 打印出来。• 在某些Linux发行版上这两个包是分开发布的,需要手动安装• Ubuntu需要分别安装 libprotobuf-dev 以及 protobuf-compiler 两个包•..._mnn 编译linux

利用CSS3制作淡入淡出动画效果_css3入场效果淡入淡出-程序员宅基地

文章浏览阅读1.8k次。CSS3新增动画属性“@-webkit-keyframes”,从字面就可以看出其含义——关键帧,这与Flash中的含义一致。利用CSS3制作动画效果其原理与Flash一样,我们需要定义关键帧处的状态效果,由CSS3来驱动产生动画效果。下面讲解一下如何利用CSS3制作淡入淡出的动画效果。具体实例可参考刚进入本站时的淡入效果。1. 定义动画,名称为fadeIn@-webkit-keyf_css3入场效果淡入淡出

计算机软件又必须包括什么,计算机系统应包括硬件和软件两个子系统,硬件和软件又必须依次分别包括______?...-程序员宅基地

文章浏览阅读2.8k次。计算机系统应包括硬件和软件两个子系统,硬件和软件又必须依次分别包括中央处理器和系统软件。按人的要求接收和存储信息,自动进行数据处理和计算,并输出结果信息的机器系统。计算机是脑力的延伸和扩充,是近代科学的重大成就之一。计算机系统由硬件(子)系统和软件(子)系统组成。前者是借助电、磁、光、机械等原理构成的各种物理部件的有机组合,是系统赖以工作的实体。后者是各种程序和文件,用于指挥全系统按指定的要求进行..._计算机系统包括硬件系统和软件系统 软件又必须包括

随便推点

进程调度(一)——FIFO算法_进程调度fifo算法代码-程序员宅基地

文章浏览阅读7.9k次,点赞3次,收藏22次。一 定义这是最早出现的置换算法。该算法总是淘汰最先进入内存的页面,即选择在内存中驻留时间最久的页面予以淘汰。该算法实现简单,只需把一个进程已调入内存的页面,按先后次序链接成一个队列,并设置一个指针,称为替换指针,使它总是指向最老的页面。但该算法与进程实际运行的规律不相适应,因为在进程中,有些页面经常被访问,比如,含有全局变量、常用函数、例程等的页面,FIFO 算法并不能保证这些页面不被淘汰。这里,我_进程调度fifo算法代码

mysql rownum写法_mysql应用之类似oracle rownum写法-程序员宅基地

文章浏览阅读133次。rownum是oracle才有的写法,rownum在oracle中可以用于取第一条数据,或者批量写数据时限定批量写的数量等mysql取第一条数据写法SELECT * FROM t order by id LIMIT 1;oracle取第一条数据写法SELECT * FROM t where rownum =1 order by id;ok,上面是mysql和oracle取第一条数据的写法对比,不过..._mysql 替换@rownum的写法

eclipse安装教程_ecjelm-程序员宅基地

文章浏览阅读790次,点赞3次,收藏4次。官网下载下载链接:http://www.eclipse.org/downloads/点击Download下载完成后双击运行我选择第2个,看自己需要(我选择企业级应用,如果只是单纯学习java选第一个就行)进入下一步后选择jre和安装路径修改jvm/jre的时候也可以选择本地的(点后面的文件夹进去),但是我们没有11版本的,所以还是用他的吧选择接受安装中安装过程中如果有其他界面弹出就点accept就行..._ecjelm

Linux常用网络命令_ifconfig 删除vlan-程序员宅基地

文章浏览阅读245次。原文链接:https://linux.cn/article-7801-1.htmlifconfigping &lt;IP地址&gt;:发送ICMP echo消息到某个主机traceroute &lt;IP地址&gt;:用于跟踪IP包的路由路由:netstat -r: 打印路由表route add :添加静态路由路径routed:控制动态路由的BSD守护程序。运行RIP路由协议gat..._ifconfig 删除vlan

redux_redux redis-程序员宅基地

文章浏览阅读224次。reduxredux里要求把数据都放在公共的存储区域叫store里面,组件中尽量少放数据,假如绿色的组件要给很多灰色的组件传值,绿色的组件只需要改变store里面对应的数据就行了,接着灰色的组件会自动感知到store里的数据发生了改变,store只要有变化,灰色的组件就会自动从store里重新取数据,这样绿色组件的数据就很方便的传到其它灰色组件里了。redux就是把公用的数据放在公共的区域去存..._redux redis

linux 解压zip大文件(解决乱码问题)_linux 7za解压中文乱码-程序员宅基地

文章浏览阅读2.2k次,点赞3次,收藏6次。unzip版本不支持4G以上的压缩包所以要使用p7zip:Linux一个高压缩率软件wget http://sourceforge.net/projects/p7zip/files/p7zip/9.20.1/p7zip_9.20.1_src_all.tar.bz2tar jxvf p7zip_9.20.1_src_all.tar.bz2cd p7zip_9.20.1make && make install 如果安装失败,看一下报错是不是因为没有下载gcc 和 gcc ++(p7_linux 7za解压中文乱码