Epoll - создаем простой чат!

Материал из OpenWiki
Перейти к: навигация, поиск

К моменту написания данной программы информации по epoll практически не было даже на английском, и мною был создан простой проект чата с названием seChat (сокращение от "Simple Epoll Chat") для исследования ее возможностей. В статье хотелось поделится общими впечатлениями о epoll и помочь другим узнать ее на примере простого проекта.

Постановка задачи и оговорки

Наша задача - создать чат-проект с простыми программами, а именно:

  • сервер
    • прослушивает предопределенный ip адрес и порт, и "регистрирует" всех клиентов на обслуживание;
    • идентифицирует каждого нового клиента при подключении;
    • принимает сообщение от любого клиента, и рассылает всем, кроме "отправителя";
  • клиент может получать и отправлять сообщения одновременно;
  • тестер - программа для тестирования нагрузки на сервер с большим количеством одновременных подключений.

Оговоримся сразу:

  1. epoll используется для управления событиями о новых сообщениях как на стороне сервера, так и на стороне клиента;
  2. по всем незнакомым командам и функциям не поленитесь обратится к соответствующим руководствам, там все прекрасно описано;
  3. для простоты:
    1. в проекте "нормальных" обработчиков ошибок практически нет и при любых исключениях программа просто завершается с ошибкой (что оказалось очень практичным решением на стадии кодирования и тестирования!). В идеале, программа должна обработать ошибку и постараться вернуться в "рабочий режим", но я старался кодировать "без фанатизма" поддерживая философию "чем проще, тем лучше для усвоения";
    2. я жестко закодировал ip адрес и порт - мне лень каждый раз набивать их в параметрах запуска, а вы можете сделать по другому;

Требования

  • OS Linux с ядром не ниже версии 2.5.66
  • GNU gcc
  • GNU make

Файлы проекта seChat

Makefile

CC=gcc
# Remove -g -O2 flags after debug
CXXFLAGS=-Wall -g -O2

main: server client tester

clean:
       rm -f server client tester

local.h

Заголовочный файл, где определены:

  • основные константы;
  • макросы для удобства использования программы;
  • предварительно объявлены некоторые функции.

Особо примечательного нечего нет, судите сами:

   #ifndef _SCHAT_LOCAL_H_
   #define _SCHAT_LOCAL_H

   #include <iostream>
   #include <sys/types.h>
   #include <sys/socket.h>
   #include <netinet/in.h>
   #include <arpa/inet.h>
   #include <sys/epoll.h>
   #include <fcntl.h>
   #include <errno.h>
   #include <unistd.h>
   #include <stdio.h>
   #include <list>
   #include <time.h>

   // Default buffer size
   #define BUF_SIZE 1024

   // Default port
   #define SERVER_PORT 44444

   // seChat server ip, you should change it to your own server ip address
   #define SERVER_HOST "192.168.34.15"

   // Default timeout - http://linux.die.net/man/2/epoll_wait
   #define EPOLL_RUN_TIMEOUT -1

   // Count of connections that we are planning to handle (just hint to kernel)
   #define EPOLL_SIZE 10000

   // First welcome message from server
   #define STR_WELCOME "Welcome to seChat! You ID is: Client #%d"

   // Format of message population
   #define STR_MESSAGE "Client #%d>> %s"

   // Warning message if you alone in server
   #define STR_NOONE_CONNECTED "Noone connected to server except you!"

   // Commad to exit
   #define CMD_EXIT "EXIT"

   // Macros - exit in any error (eval < 0) case
   #define CHK(eval) if(eval < 0){perror("eval"); exit(-1);}

   // Macros - same as above, but save the result(res) of expression(eval)
   #define CHK2(res, eval) if((res = eval) < 0){perror("eval"); exit(-1);}

   // Preliminary declaration of functions
   int setnonblocking(int sockfd);
   void debug_epoll_event(epoll_event ev);
   int handle_message(int new_fd);
   int print_incoming(int fd);

   #endif

utils.h

Дополнительные утилиты

#include "local.h"

// Debug epoll_event
void debug_epoll_event(epoll_event ev){
       printf("fd(%d), ev.events:", ev.data.fd);

       if(ev.events & EPOLLIN)
               printf(" EPOLLIN ");
       if(ev.events & EPOLLOUT)
               printf(" EPOLLOUT ");
       if(ev.events & EPOLLET)
               printf(" EPOLLET ");
       if(ev.events & EPOLLPRI)
               printf(" EPOLLPRI ");
       if(ev.events & EPOLLRDNORM)
               printf(" EPOLLRDNORM ");
       if(ev.events & EPOLLRDBAND)
               printf(" EPOLLRDBAND ");
       if(ev.events & EPOLLWRNORM)
               printf(" EPOLLRDNORM ");
       if(ev.events & EPOLLWRBAND)
               printf(" EPOLLWRBAND ");
       if(ev.events & EPOLLMSG)
               printf(" EPOLLMSG ");
       if(ev.events & EPOLLERR)
               printf(" EPOLLERR ");
       if(ev.events & EPOLLHUP)
               printf(" EPOLLHUP ");
       if(ev.events & EPOLLONESHOT)
               printf(" EPOLLONESHOT ");

       printf("\n");

}

// Setup nonblocking socket
int setnonblocking(int sockfd)
{
   CHK(fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0)|O_NONBLOCK));
   return 0;
}

server.cpp

Кодировать сервер было проще всего. Задача сервера предельна ясна, получать сообщения от клиента и делать массовые рассылки другим( если таковые есть на сервере) или отправить предупреждение отправителю об их отсутствии.

   #include "local.h"
   #include "utils.h"

   using namespace std;

   // To store client's socket list
   list<int> clients_list;

   // for debug mode
   int DEBUG_MODE = 0;

   int main(int argc, char *argv[])
   {
       // *** Define debug mode
       //     any additional parameres on startup
       //     i.e. like './server f' or './server debug'
       //     we will switch to switch to debug mode(very simple anmd useful)
       if(argc > 1) DEBUG_MODE = 1;
      
       if(DEBUG_MODE){
           printf("Debug mode is ON!\n");
           printf("MAIN: argc = %d\n", argc);
           for(int i=0; i<argc; i++)
               printf(" argv[%d] = %s\n", i, argv[i]);
       }else printf("Debug mode is OFF!\n");

       // *** Define values
       //     main server listener
       int listener;
      
       // define ip & ports for server(addr)
       //     and incoming client ip & ports(their_addr)
       struct sockaddr_in addr, their_addr;
       //     configure ip & port for listen
       addr.sin_family = PF_INET;
       addr.sin_port = htons(SERVER_PORT);
       addr.sin_addr.s_addr = inet_addr(SERVER_HOST);

       //     size of address
       socklen_t socklen;
       socklen = sizeof(struct sockaddr_in);

       //     event template for epoll_ctl(ev)
       //     storage array for incoming events from epoll_wait(events)
       //        and maximum events count could be EPOLL_SIZE
       static struct epoll_event ev, events[EPOLL_SIZE];
       //     watch just incoming(EPOLLIN)
       //     and Edge Trigged(EPOLLET) events
       ev.events = EPOLLIN | EPOLLET;

       //     chat message buffer
       char message[BUF_SIZE];

       //     epoll descriptor to watch events
       int epfd;

       //     to calculate the execution time of a program
       clock_t tStart;

       // other values:
       //     new client descriptor(client)
       //     to keep the results of different functions(res)
       //     to keep incoming epoll_wait's events count(epoll_events_count)
       int client, res, epoll_events_count;


       // *** Setup server listener
       //     create listener with PF_INET(IPv4) and
       //     SOCK_STREAM(sequenced, reliable, two-way, connection-based byte stream)
       CHK2(listener, socket(PF_INET, SOCK_STREAM, 0));
       printf("Main listener(fd=%d) created! \n",listener);

       //    setup nonblocking socket
       setnonblocking(listener);

       //    bind listener to address(addr)
       CHK(bind(listener, (struct sockaddr *)&addr, sizeof(addr)));
       printf("Listener binded to: %s\n", SERVER_HOST);

       //    start to listen connections
       CHK(listen(listener, 1));
       printf("Start to listen: %s!\n", SERVER_HOST);

       // *** Setup epoll
       //     create epoll descriptor
       //     and backup store for EPOLL_SIZE of socket events
       CHK2(epfd,epoll_create(EPOLL_SIZE));
       printf("Epoll(fd=%d) created!\n", epfd);
      
       //     set listener to event template
       ev.data.fd = listener;

       //     add listener to epoll
       CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, listener, &ev));
       printf("Main listener(%d) added to epoll!\n", epfd);

       // *** Main cycle(epoll_wait)
       while(1)
       {
           CHK2(epoll_events_count,epoll_wait(epfd, events, EPOLL_SIZE, EPOLL_RUN_TIMEOUT));
           if(DEBUG_MODE) printf("Epoll events count: %d\n", epoll_events_count);
           // setup tStart time
           tStart = clock();

           for(int i = 0; i < epoll_events_count ; i++)
           {
                   if(DEBUG_MODE){
                           printf("events[%d].data.fd = %d\n", i, events[i].data.fd);
                           debug_epoll_event(events[i]);
    
                   }
                   // EPOLLIN event for listener(new client connection)
                   if(events[i].data.fd == listener)
                   {
                           CHK2(client,accept(listener, (struct sockaddr *) &their_addr, &socklen));
                           if(DEBUG_MODE) printf("connection from:%s:%d, socket assigned to:%d \n",
                                             inet_ntoa(their_addr.sin_addr),
                                             ntohs(their_addr.sin_port),
                                             client);
                           // setup nonblocking socket
                           setnonblocking(client);

                           // set new client to event template
                           ev.data.fd = client;

                           // add new client to epoll
                           CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, client, &ev));

                           // save new descriptor to further use
                           clients_list.push_back(client); // add new connection to list of clients
                           if(DEBUG_MODE) printf("Add new client(fd = %d) to epoll and now clients_list.size = %d\n", 
                                               client,  
                                               clients_list.size());

                           // send initial welcome message to client
                           bzero(message, BUF_SIZE);
                           res = sprintf(message, STR_WELCOME, client);
                           CHK2(res, send(client, message, BUF_SIZE, 0));

                   }else { // EPOLLIN event for others(new incoming message from client)
                           CHK2(res,handle_message(events[i].data.fd));
                   }
           }
           // print epoll events handling statistics
           printf("Statistics: %d events handled at: %.2f second(s)\n",
                                           epoll_events_count,
                                           (double)(clock() - tStart)/CLOCKS_PER_SEC);
       }

       close(listener);
       close(epfd);

       return 0;
   }

   // *** Handle incoming message from clients
   int handle_message(int client)
   {
       // get row message from client(buf)
       //     and format message to populate(message)
       char buf[BUF_SIZE], message[BUF_SIZE];
       bzero(buf, BUF_SIZE);
       bzero(message, BUF_SIZE);

       // to keep different results
       int len;

       // try to get new raw message from client
       if(DEBUG_MODE) printf("Try to read from fd(%d)\n", client);
       CHK2(len,recv(client, buf, BUF_SIZE, 0));

       // zero size of len mean the client closed connection
       if(len == 0){
           CHK(close(client));
           clients_list.remove(client);
           if(DEBUG_MODE) printf("Client with fd: %d closed! And now clients_list.size = %d\n", client, clients_list.size());
       // populate message around the world
       }else{

           if(clients_list.size() == 1) { // this means that noone connected to server except YOU!
                   CHK(send(client, STR_NOONE_CONNECTED, strlen(STR_NOONE_CONNECTED), 0));
                   return len;
           }
          
           // format message to populate
           sprintf(message, STR_MESSAGE, client, buf);

           // populate message around the world ;-)...
           list<int>::iterator it;
           for(it = clients_list.begin(); it != clients_list.end(); it++){
              if(*it != client){ // ... except youself of course
                   CHK(send(*it, message, BUF_SIZE, 0));
                   if(DEBUG_MODE) printf("Message '%s' send to client with fd(%d) \n", message, *it);
              }
           }
           if(DEBUG_MODE) printf("Client(%d) received message successfully:'%s', a total of %d bytes data...\n",
                client,
                buf,
                len);
       }

       return len;
   }

client.cpp

Главная проблема клиентской части - одновременно следить за новыми сообщения как от пользователя, так и от сервера. И я решил ее созданием двух процессов (родительского и дочернего, через fork) для того что бы:

  • дочерний процесс - ожидал ввода сообщения от пользователя;
  • родительский процесс - ожидал новых сообщений как от сервера, так и дочернего процесса используя все то же epoll.

Связь между дочерним и родительским процессом осуществляется через pipe (в 'man pipe' есть отличный пример как это сделать).

#include "local.h"
#include "utils.h"

using namespace std;

// chat message buffer
char message[BUF_SIZE];

// for debug mode
int DEBUG_MODE = 0;

/*
  We use 'fork' to make two process.
    Child process:
    - waiting for user's input message;
    - and sending all users messages to parent process through pipe.
    ('man pipe' has good example how to do it)

    Parent process:
    - wating for incoming messages(EPOLLIN):
    -- from server(socket) to display;
    -- from child process(pipe) to transmit to server(socket)
     
*/

int main(int argc, char *argv[])
{
        // *** Define debug mode
        //     any additional parameres on startup
        //     i.e. like './client f' or './client debug'
        //     we will switch to debug mode(very simple anmd useful)
        if(argc > 1) DEBUG_MODE = 1;

        if(DEBUG_MODE){
                printf("Debug mode is ON!\n");
                printf("MAIN: argc = %d\n", argc);
                for(int i=0; i<argc; i++)
                        printf(" argv[%d] = %s\n", i, argv[i]);
        }else printf("Debug mode is OFF!\n");

        // *** Define values
        //     socket connection with server(sock)
        //     process ID(pid)
        //     pipe between chils & parent processes(pipe_fd)
        //     epoll descriptor to watch events
        int sock, pid, pipe_fd[2], epfd;

        //     define ip & ports for server(addr)
        struct sockaddr_in addr;
        addr.sin_family = PF_INET;
        addr.sin_port = htons(SERVER_PORT);
        addr.sin_addr.s_addr = inet_addr(SERVER_HOST);

        //     event template for epoll_ctl(ev)
        //     storage array for incoming events from epoll_wait(events)
        //     and maximum events count could be 2
        //     'sock' from server and 'pipe' from parent process(user inputs)
        static struct epoll_event ev, events[2]; // Socket(in|out) & Pipe(in)
        ev.events = EPOLLIN | EPOLLET;

        //     if it's zero, we should shoud down client
        int continue_to_work = 1;

        // *** Setup socket connection with server
        CHK2(sock,socket(PF_INET, SOCK_STREAM, 0));
        CHK(connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0);

        // *** Setup pipe to send messages from child process to parent
        CHK(pipe(pipe_fd));
        if(DEBUG_MODE) printf("Created pipe with pipe_fd[0](read part): %d and pipe_fd[1](write part): % d\n",
                        pipe_fd[0],
                        pipe_fd[1]);

        // *** Create & configure epoll
        CHK2(epfd,epoll_create(EPOLL_SIZE));
        if(DEBUG_MODE) printf("Created epoll with fd: %d\n", epfd);

        //     add server connetion(sock) to epoll to listen incoming messages from server
        ev.data.fd = sock;
        CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev));
        if(DEBUG_MODE) printf("Socket connection (fd = %d) added to epoll\n", sock);

        //     add read part of pipe(pipe_fd[0]) to epoll
        //     to listen incoming messages from child process
        ev.data.fd = pipe_fd[0];
        CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, pipe_fd[0], &ev));
        if(DEBUG_MODE) printf("Pipe[0] (read) with fd(%d) added to epoll\n", pipe_fd[0]);

        // Fork
        CHK2(pid,fork());
        switch(pid){
                case 0: // child process
                        close(pipe_fd[0]); // we dont need read pipe anymore
                        printf("Enter 'exit' to exit\n");
                        while(continue_to_work){
                                bzero(&message, BUF_SIZE);
                                fgets(message, BUF_SIZE, stdin);

                                // close while cycle for 'exit' command
                                if(strncasecmp(message, CMD_EXIT, strlen(CMD_EXIT)) == 0){
                                        continue_to_work = 0;
                                        // send user's message to parent process
                                }else CHK(write(pipe_fd[1], message, strlen(message) - 1));
                        }
                        break;
                default: //parent process
                        close(pipe_fd[1]); // we dont need write pipe anymore

                        // incoming epoll_wait's events count(epoll_events_count)
                        // results of different functions(res)
                        int epoll_events_count, res;

                        // *** Main cycle(epoll_wait)
                        while(continue_to_work) {
                                CHK2(epoll_events_count,epoll_wait(epfd, events, 2, EPOLL_RUN_TIMEOUT));
                                if(DEBUG_MODE) printf("Epoll events count: %d\n", epoll_events_count);

                                for(int i = 0; i < epoll_events_count ; i++){
                                        bzero(&message, BUF_SIZE);

                                        // EPOLLIN event from server( new message from server)
                                        if(events[i].data.fd == sock){
                                                if(DEBUG_MODE) printf("Server sends new message!\n");
                                                CHK2(res,recv(sock, message, BUF_SIZE, 0));

                                                // zero size of result means the server closed connection
                                                if(res == 0){
                                                        if(DEBUG_MODE) printf("Server closed connection: %d\n", sock);
                                                        CHK(close(sock));
                                                        continue_to_work = 0;
                                                }else printf("%s\n", message);

                                                // EPOLLIN event from child process(user's input message)
                                        }else{
                                                if(DEBUG_MODE) printf("New pipe event!\n");
                                                CHK2(res, read(events[i].data.fd, message, BUF_SIZE));

                                                // zero size of result means the child process going to exit
                                                if(res == 0) continue_to_work = 0; // exit parent to
                                                // send message to server
                                                else{
                                                        CHK(send(sock, message, BUF_SIZE, 0));
                                                }
                                        }
                                }
                        }
        }
        if(pid){
                if(DEBUG_MODE) printf("Shutting down parent!\n");
                close(pipe_fd[0]);
                close(sock);
        }else{
                if(DEBUG_MODE) printf("Shutting down child!\n");
                close(pipe_fd[1]);
        }

        return 0;
}

tester.cpp

Программа для тестирования:

  • открывает одновременно EPOLL_SIZE соединений с сервером (в моем случае EPOLL_SIZE = 10000);
  • получает по каждому соединению отдельное "приветствие сервера";
  • закрывает все соединения;
  • и выводит небольшую статистику о своей работе.

Все просто!

#include "local.h"
#include "utils.h" 

using namespace std;

// to keep message from server
char message[BUF_SIZE];

// for debuf mode
int DEBUG_MODE = 0;

// to store client's sockets list
list<int> list_of_clients;

// to keep result of different functions
int res;

// to calculate the execution time of a program
clock_t tStart;

int main(int argc, char *argv[])
{
   // *** Define debug mode
   //     any additional parameres on startup
   //     i.e. like './server f' or './server debug'
   //     we will switch to switch to debug mode(very simple anmd useful)
   if(argc > 1) DEBUG_MODE = 1;

    if(DEBUG_MODE){
        printf("Debug mode is ON!\n");
        printf("MAIN: argc = %d\n", argc);
        for(int i=0; i<argc; i++)
            printf(" argv[%d] = %s\n", i, argv[i]);
    }else printf("Debug mode is OFF!\n");

    // *** Define values
    //     connetion with server
    int sock;
    
    //     define address & port of server
    struct sockaddr_in addr;
    addr.sin_family = PF_INET;
    addr.sin_port = htons(SERVER_PORT);
    addr.sin_addr.s_addr = inet_addr(SERVER_HOST);
    
    // setup start time
    tStart = clock();

    // create EPOLL_SIZE connections with server
    for(int i=0 ; i<EPOLL_SIZE; i++){

               // create new socket connection with server 
               CHK2(sock,socket(PF_INET, SOCK_STREAM, 0));
               CHK(connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0);
               list_of_clients.push_back(sock);
               if(DEBUG_MODE) printf("Create new test client with fd: %d\n", sock);

               // Get welcome messge from server!
               bzero(&message, BUF_SIZE);
               CHK2(res,recv(sock, message, BUF_SIZE, 0));
               printf("%s\n", message);
    }
   
    // close all connections
    list<int>::iterator it;
    for(it = list_of_clients.begin(); it != list_of_clients.end() ; it++)
       close(*it);

    // print statistics 
    printf("Test passed at: %.2f second(s)\n", (double)(clock() - tStart)/CLOCKS_PER_SEC); 
    printf("Total server connections was: %d\n", EPOLL_SIZE);
    
    return 0;
}

Заключение

Лучше один раз увидеть, чем сто раз услышать - тут картинка с результатом работы программы-теста.

Sechatstat.PNG

ИТОГИ:

  1. По результатам тестов на моей машине (виртуальная CentOS 5.2 на Proxmox c оперативкой 1Гб и одним выделенным процессором) сервер обработал около 7000 соединений за четверть секунды. По моему не плохо!
  2. Для удобства добавил все исходники в Google Code открыв новый проект sechat, тестируйте/пользуйтесь кому интересно.

Приветствуются любая критика и замечания - maksud.nurullaev[DOG]gmail.com

P.S.

  • если при запуске теста, система будет ругаться на большое количество одновременно открытых файлов(дескрипторов), проверьте свои лимиты через ulimit -n и измените на подходящее значение;
  • тексты программ изобилуют комментариями, но если будут какие то вопросы или пожелания о детализации и переводе, нет проблем, сделаю как только освобожусь в ближайшее время;
  • я создал проект ТОЛЬКО ЛИШЬ для тестирования epoll, а не для создания "шедеврального" чата, так что заранее прошу извинить за примитивность кода.

--Maksud 05:17, 8 апреля 2010 (UTC)