Epoll - создаем простой чат!
К моменту написания данной программы информации по epoll практически не было даже на английском, и мною был создан простой проект чата с названием seChat (сокращение от "Simple Epoll Chat") для исследования ее возможностей. В статье хотелось поделится общими впечатлениями о epoll и помочь другим узнать ее на примере простого проекта.
Содержание
Постановка задачи и оговорки
Наша задача - создать чат-проект с простыми программами, а именно:
- сервер
- прослушивает предопределенный ip адрес и порт, и "регистрирует" всех клиентов на обслуживание;
- идентифицирует каждого нового клиента при подключении;
- принимает сообщение от любого клиента, и рассылает всем, кроме "отправителя";
- клиент может получать и отправлять сообщения одновременно;
- тестер - программа для тестирования нагрузки на сервер с большим количеством одновременных подключений.
Оговоримся сразу:
- epoll используется для управления событиями о новых сообщениях как на стороне сервера, так и на стороне клиента;
- по всем незнакомым командам и функциям не поленитесь обратится к соответствующим руководствам, там все прекрасно описано;
- для простоты:
- в проекте "нормальных" обработчиков ошибок практически нет и при любых исключениях программа просто завершается с ошибкой (что оказалось очень практичным решением на стадии кодирования и тестирования!). В идеале, программа должна обработать ошибку и постараться вернуться в "рабочий режим", но я старался кодировать "без фанатизма" поддерживая философию "чем проще, тем лучше для усвоения";
- я жестко закодировал ip адрес и порт - мне лень каждый раз набивать их в параметрах запуска, а вы можете сделать по другому;
Требования
- OS Linux с ядром не ниже версии 2.5.66
- GNU gcc
- GNU make
Файлы проекта seChat
Makefile
CC=gcc # Remove -g -O2 flags after debug CXXFLAGS=-Wall -g -O2 main: server client tester clean: rm -f server client tester
local.h
Заголовочный файл, где определены:
- основные константы;
- макросы для удобства использования программы;
- предварительно объявлены некоторые функции.
Особо примечательного нечего нет, судите сами:
#ifndef _SCHAT_LOCAL_H_ #define _SCHAT_LOCAL_H #include <iostream> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <sys/epoll.h> #include <fcntl.h> #include <errno.h> #include <unistd.h> #include <stdio.h> #include <list> #include <time.h> // Default buffer size #define BUF_SIZE 1024 // Default port #define SERVER_PORT 44444 // seChat server ip, you should change it to your own server ip address #define SERVER_HOST "192.168.34.15" // Default timeout - http://linux.die.net/man/2/epoll_wait #define EPOLL_RUN_TIMEOUT -1 // Count of connections that we are planning to handle (just hint to kernel) #define EPOLL_SIZE 10000 // First welcome message from server #define STR_WELCOME "Welcome to seChat! You ID is: Client #%d" // Format of message population #define STR_MESSAGE "Client #%d>> %s" // Warning message if you alone in server #define STR_NOONE_CONNECTED "Noone connected to server except you!" // Commad to exit #define CMD_EXIT "EXIT" // Macros - exit in any error (eval < 0) case #define CHK(eval) if(eval < 0){perror("eval"); exit(-1);} // Macros - same as above, but save the result(res) of expression(eval) #define CHK2(res, eval) if((res = eval) < 0){perror("eval"); exit(-1);} // Preliminary declaration of functions int setnonblocking(int sockfd); void debug_epoll_event(epoll_event ev); int handle_message(int new_fd); int print_incoming(int fd); #endif
utils.h
Дополнительные утилиты
#include "local.h" // Debug epoll_event void debug_epoll_event(epoll_event ev){ printf("fd(%d), ev.events:", ev.data.fd); if(ev.events & EPOLLIN) printf(" EPOLLIN "); if(ev.events & EPOLLOUT) printf(" EPOLLOUT "); if(ev.events & EPOLLET) printf(" EPOLLET "); if(ev.events & EPOLLPRI) printf(" EPOLLPRI "); if(ev.events & EPOLLRDNORM) printf(" EPOLLRDNORM "); if(ev.events & EPOLLRDBAND) printf(" EPOLLRDBAND "); if(ev.events & EPOLLWRNORM) printf(" EPOLLRDNORM "); if(ev.events & EPOLLWRBAND) printf(" EPOLLWRBAND "); if(ev.events & EPOLLMSG) printf(" EPOLLMSG "); if(ev.events & EPOLLERR) printf(" EPOLLERR "); if(ev.events & EPOLLHUP) printf(" EPOLLHUP "); if(ev.events & EPOLLONESHOT) printf(" EPOLLONESHOT "); printf("\n"); } // Setup nonblocking socket int setnonblocking(int sockfd) { CHK(fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0)|O_NONBLOCK)); return 0; }
server.cpp
Кодировать сервер было проще всего. Задача сервера предельна ясна, получать сообщения от клиента и делать массовые рассылки другим( если таковые есть на сервере) или отправить предупреждение отправителю об их отсутствии.
#include "local.h" #include "utils.h" using namespace std; // To store client's socket list list<int> clients_list; // for debug mode int DEBUG_MODE = 0; int main(int argc, char *argv[]) { // *** Define debug mode // any additional parameres on startup // i.e. like './server f' or './server debug' // we will switch to switch to debug mode(very simple anmd useful) if(argc > 1) DEBUG_MODE = 1; if(DEBUG_MODE){ printf("Debug mode is ON!\n"); printf("MAIN: argc = %d\n", argc); for(int i=0; i<argc; i++) printf(" argv[%d] = %s\n", i, argv[i]); }else printf("Debug mode is OFF!\n"); // *** Define values // main server listener int listener; // define ip & ports for server(addr) // and incoming client ip & ports(their_addr) struct sockaddr_in addr, their_addr; // configure ip & port for listen addr.sin_family = PF_INET; addr.sin_port = htons(SERVER_PORT); addr.sin_addr.s_addr = inet_addr(SERVER_HOST); // size of address socklen_t socklen; socklen = sizeof(struct sockaddr_in); // event template for epoll_ctl(ev) // storage array for incoming events from epoll_wait(events) // and maximum events count could be EPOLL_SIZE static struct epoll_event ev, events[EPOLL_SIZE]; // watch just incoming(EPOLLIN) // and Edge Trigged(EPOLLET) events ev.events = EPOLLIN | EPOLLET; // chat message buffer char message[BUF_SIZE]; // epoll descriptor to watch events int epfd; // to calculate the execution time of a program clock_t tStart; // other values: // new client descriptor(client) // to keep the results of different functions(res) // to keep incoming epoll_wait's events count(epoll_events_count) int client, res, epoll_events_count; // *** Setup server listener // create listener with PF_INET(IPv4) and // SOCK_STREAM(sequenced, reliable, two-way, connection-based byte stream) CHK2(listener, socket(PF_INET, SOCK_STREAM, 0)); printf("Main listener(fd=%d) created! \n",listener); // setup nonblocking socket setnonblocking(listener); // bind listener to address(addr) CHK(bind(listener, (struct sockaddr *)&addr, sizeof(addr))); printf("Listener binded to: %s\n", SERVER_HOST); // start to listen connections CHK(listen(listener, 1)); printf("Start to listen: %s!\n", SERVER_HOST); // *** Setup epoll // create epoll descriptor // and backup store for EPOLL_SIZE of socket events CHK2(epfd,epoll_create(EPOLL_SIZE)); printf("Epoll(fd=%d) created!\n", epfd); // set listener to event template ev.data.fd = listener; // add listener to epoll CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, listener, &ev)); printf("Main listener(%d) added to epoll!\n", epfd); // *** Main cycle(epoll_wait) while(1) { CHK2(epoll_events_count,epoll_wait(epfd, events, EPOLL_SIZE, EPOLL_RUN_TIMEOUT)); if(DEBUG_MODE) printf("Epoll events count: %d\n", epoll_events_count); // setup tStart time tStart = clock(); for(int i = 0; i < epoll_events_count ; i++) { if(DEBUG_MODE){ printf("events[%d].data.fd = %d\n", i, events[i].data.fd); debug_epoll_event(events[i]); } // EPOLLIN event for listener(new client connection) if(events[i].data.fd == listener) { CHK2(client,accept(listener, (struct sockaddr *) &their_addr, &socklen)); if(DEBUG_MODE) printf("connection from:%s:%d, socket assigned to:%d \n", inet_ntoa(their_addr.sin_addr), ntohs(their_addr.sin_port), client); // setup nonblocking socket setnonblocking(client); // set new client to event template ev.data.fd = client; // add new client to epoll CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, client, &ev)); // save new descriptor to further use clients_list.push_back(client); // add new connection to list of clients if(DEBUG_MODE) printf("Add new client(fd = %d) to epoll and now clients_list.size = %d\n", client, clients_list.size()); // send initial welcome message to client bzero(message, BUF_SIZE); res = sprintf(message, STR_WELCOME, client); CHK2(res, send(client, message, BUF_SIZE, 0)); }else { // EPOLLIN event for others(new incoming message from client) CHK2(res,handle_message(events[i].data.fd)); } } // print epoll events handling statistics printf("Statistics: %d events handled at: %.2f second(s)\n", epoll_events_count, (double)(clock() - tStart)/CLOCKS_PER_SEC); } close(listener); close(epfd); return 0; } // *** Handle incoming message from clients int handle_message(int client) { // get row message from client(buf) // and format message to populate(message) char buf[BUF_SIZE], message[BUF_SIZE]; bzero(buf, BUF_SIZE); bzero(message, BUF_SIZE); // to keep different results int len; // try to get new raw message from client if(DEBUG_MODE) printf("Try to read from fd(%d)\n", client); CHK2(len,recv(client, buf, BUF_SIZE, 0)); // zero size of len mean the client closed connection if(len == 0){ CHK(close(client)); clients_list.remove(client); if(DEBUG_MODE) printf("Client with fd: %d closed! And now clients_list.size = %d\n", client, clients_list.size()); // populate message around the world }else{ if(clients_list.size() == 1) { // this means that noone connected to server except YOU! CHK(send(client, STR_NOONE_CONNECTED, strlen(STR_NOONE_CONNECTED), 0)); return len; } // format message to populate sprintf(message, STR_MESSAGE, client, buf); // populate message around the world ;-)... list<int>::iterator it; for(it = clients_list.begin(); it != clients_list.end(); it++){ if(*it != client){ // ... except youself of course CHK(send(*it, message, BUF_SIZE, 0)); if(DEBUG_MODE) printf("Message '%s' send to client with fd(%d) \n", message, *it); } } if(DEBUG_MODE) printf("Client(%d) received message successfully:'%s', a total of %d bytes data...\n", client, buf, len); } return len; }
client.cpp
Главная проблема клиентской части - одновременно следить за новыми сообщения как от пользователя, так и от сервера. И я решил ее созданием двух процессов (родительского и дочернего, через fork) для того что бы:
- дочерний процесс - ожидал ввода сообщения от пользователя;
- родительский процесс - ожидал новых сообщений как от сервера, так и дочернего процесса используя все то же epoll.
Связь между дочерним и родительским процессом осуществляется через pipe (в 'man pipe' есть отличный пример как это сделать).
#include "local.h" #include "utils.h" using namespace std; // chat message buffer char message[BUF_SIZE]; // for debug mode int DEBUG_MODE = 0; /* We use 'fork' to make two process. Child process: - waiting for user's input message; - and sending all users messages to parent process through pipe. ('man pipe' has good example how to do it) Parent process: - wating for incoming messages(EPOLLIN): -- from server(socket) to display; -- from child process(pipe) to transmit to server(socket) */ int main(int argc, char *argv[]) { // *** Define debug mode // any additional parameres on startup // i.e. like './client f' or './client debug' // we will switch to debug mode(very simple anmd useful) if(argc > 1) DEBUG_MODE = 1; if(DEBUG_MODE){ printf("Debug mode is ON!\n"); printf("MAIN: argc = %d\n", argc); for(int i=0; i<argc; i++) printf(" argv[%d] = %s\n", i, argv[i]); }else printf("Debug mode is OFF!\n"); // *** Define values // socket connection with server(sock) // process ID(pid) // pipe between chils & parent processes(pipe_fd) // epoll descriptor to watch events int sock, pid, pipe_fd[2], epfd; // define ip & ports for server(addr) struct sockaddr_in addr; addr.sin_family = PF_INET; addr.sin_port = htons(SERVER_PORT); addr.sin_addr.s_addr = inet_addr(SERVER_HOST); // event template for epoll_ctl(ev) // storage array for incoming events from epoll_wait(events) // and maximum events count could be 2 // 'sock' from server and 'pipe' from parent process(user inputs) static struct epoll_event ev, events[2]; // Socket(in|out) & Pipe(in) ev.events = EPOLLIN | EPOLLET; // if it's zero, we should shoud down client int continue_to_work = 1; // *** Setup socket connection with server CHK2(sock,socket(PF_INET, SOCK_STREAM, 0)); CHK(connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0); // *** Setup pipe to send messages from child process to parent CHK(pipe(pipe_fd)); if(DEBUG_MODE) printf("Created pipe with pipe_fd[0](read part): %d and pipe_fd[1](write part): % d\n", pipe_fd[0], pipe_fd[1]); // *** Create & configure epoll CHK2(epfd,epoll_create(EPOLL_SIZE)); if(DEBUG_MODE) printf("Created epoll with fd: %d\n", epfd); // add server connetion(sock) to epoll to listen incoming messages from server ev.data.fd = sock; CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev)); if(DEBUG_MODE) printf("Socket connection (fd = %d) added to epoll\n", sock); // add read part of pipe(pipe_fd[0]) to epoll // to listen incoming messages from child process ev.data.fd = pipe_fd[0]; CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, pipe_fd[0], &ev)); if(DEBUG_MODE) printf("Pipe[0] (read) with fd(%d) added to epoll\n", pipe_fd[0]); // Fork CHK2(pid,fork()); switch(pid){ case 0: // child process close(pipe_fd[0]); // we dont need read pipe anymore printf("Enter 'exit' to exit\n"); while(continue_to_work){ bzero(&message, BUF_SIZE); fgets(message, BUF_SIZE, stdin); // close while cycle for 'exit' command if(strncasecmp(message, CMD_EXIT, strlen(CMD_EXIT)) == 0){ continue_to_work = 0; // send user's message to parent process }else CHK(write(pipe_fd[1], message, strlen(message) - 1)); } break; default: //parent process close(pipe_fd[1]); // we dont need write pipe anymore // incoming epoll_wait's events count(epoll_events_count) // results of different functions(res) int epoll_events_count, res; // *** Main cycle(epoll_wait) while(continue_to_work) { CHK2(epoll_events_count,epoll_wait(epfd, events, 2, EPOLL_RUN_TIMEOUT)); if(DEBUG_MODE) printf("Epoll events count: %d\n", epoll_events_count); for(int i = 0; i < epoll_events_count ; i++){ bzero(&message, BUF_SIZE); // EPOLLIN event from server( new message from server) if(events[i].data.fd == sock){ if(DEBUG_MODE) printf("Server sends new message!\n"); CHK2(res,recv(sock, message, BUF_SIZE, 0)); // zero size of result means the server closed connection if(res == 0){ if(DEBUG_MODE) printf("Server closed connection: %d\n", sock); CHK(close(sock)); continue_to_work = 0; }else printf("%s\n", message); // EPOLLIN event from child process(user's input message) }else{ if(DEBUG_MODE) printf("New pipe event!\n"); CHK2(res, read(events[i].data.fd, message, BUF_SIZE)); // zero size of result means the child process going to exit if(res == 0) continue_to_work = 0; // exit parent to // send message to server else{ CHK(send(sock, message, BUF_SIZE, 0)); } } } } } if(pid){ if(DEBUG_MODE) printf("Shutting down parent!\n"); close(pipe_fd[0]); close(sock); }else{ if(DEBUG_MODE) printf("Shutting down child!\n"); close(pipe_fd[1]); } return 0; }
tester.cpp
Программа для тестирования:
- открывает одновременно EPOLL_SIZE соединений с сервером (в моем случае EPOLL_SIZE = 10000);
- получает по каждому соединению отдельное "приветствие сервера";
- закрывает все соединения;
- и выводит небольшую статистику о своей работе.
Все просто!
#include "local.h" #include "utils.h" using namespace std; // to keep message from server char message[BUF_SIZE]; // for debuf mode int DEBUG_MODE = 0; // to store client's sockets list list<int> list_of_clients; // to keep result of different functions int res; // to calculate the execution time of a program clock_t tStart; int main(int argc, char *argv[]) { // *** Define debug mode // any additional parameres on startup // i.e. like './server f' or './server debug' // we will switch to switch to debug mode(very simple anmd useful) if(argc > 1) DEBUG_MODE = 1; if(DEBUG_MODE){ printf("Debug mode is ON!\n"); printf("MAIN: argc = %d\n", argc); for(int i=0; i<argc; i++) printf(" argv[%d] = %s\n", i, argv[i]); }else printf("Debug mode is OFF!\n"); // *** Define values // connetion with server int sock; // define address & port of server struct sockaddr_in addr; addr.sin_family = PF_INET; addr.sin_port = htons(SERVER_PORT); addr.sin_addr.s_addr = inet_addr(SERVER_HOST); // setup start time tStart = clock(); // create EPOLL_SIZE connections with server for(int i=0 ; i<EPOLL_SIZE; i++){ // create new socket connection with server CHK2(sock,socket(PF_INET, SOCK_STREAM, 0)); CHK(connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0); list_of_clients.push_back(sock); if(DEBUG_MODE) printf("Create new test client with fd: %d\n", sock); // Get welcome messge from server! bzero(&message, BUF_SIZE); CHK2(res,recv(sock, message, BUF_SIZE, 0)); printf("%s\n", message); } // close all connections list<int>::iterator it; for(it = list_of_clients.begin(); it != list_of_clients.end() ; it++) close(*it); // print statistics printf("Test passed at: %.2f second(s)\n", (double)(clock() - tStart)/CLOCKS_PER_SEC); printf("Total server connections was: %d\n", EPOLL_SIZE); return 0; }
Заключение
Лучше один раз увидеть, чем сто раз услышать - тут картинка с результатом работы программы-теста.
ИТОГИ:
- По результатам тестов на моей машине (виртуальная CentOS 5.2 на Proxmox c оперативкой 1Гб и одним выделенным процессором) сервер обработал около 7000 соединений за четверть секунды. По моему не плохо!
- Для удобства добавил все исходники в Google Code открыв новый проект sechat, тестируйте/пользуйтесь кому интересно.
Приветствуются любая критика и замечания - maksud.nurullaev[DOG]gmail.com
P.S.
- если при запуске теста, система будет ругаться на большое количество одновременно открытых файлов(дескрипторов), проверьте свои лимиты через ulimit -n и измените на подходящее значение;
- тексты программ изобилуют комментариями, но если будут какие то вопросы или пожелания о детализации и переводе, нет проблем, сделаю как только освобожусь в ближайшее время;
- я создал проект ТОЛЬКО ЛИШЬ для тестирования epoll, а не для создания "шедеврального" чата, так что заранее прошу извинить за примитивность кода.
--Maksud 05:17, 8 апреля 2010 (UTC)