Epoll - создаем простой чат!
К моменту написания данной программы информации по epoll практически не было даже на английском, и мною был создан простой проект чата с названием seChat (сокращение от "Simple Epoll Chat") для исследования ее возможностей. В статье хотелось поделится общими впечатлениями о epoll и помочь другим узнать ее на примере простого проекта.
Содержание
Постановка задачи и оговорки
Наша задача - создать чат-проект с простыми программами, а именно:
- сервер
- прослушивает предопределенный ip адрес и порт, и "регистрирует" всех клиентов на обслуживание;
- идентифицирует каждого нового клиента при подключении;
- принимает сообщение от любого клиента, и рассылает всем, кроме "отправителя";
- клиент может получать и отправлять сообщения одновременно;
- тестер - программа для тестирования нагрузки на сервер с большим количеством одновременных подключений.
Оговоримся сразу:
- epoll используется для управления событиями о новых сообщениях как на стороне сервера, так и на стороне клиента;
- по всем незнакомым командам и функциям не поленитесь обратится к соответствующим руководствам, там все прекрасно описано;
- для простоты:
- в проекте "нормальных" обработчиков ошибок практически нет и при любых исключениях программа просто завершается с ошибкой (что оказалось очень практичным решением на стадии кодирования и тестирования!). В идеале, программа должна обработать ошибку и постараться вернуться в "рабочий режим", но я старался кодировать "без фанатизма" поддерживая философию "чем проще, тем лучше для усвоения";
- я жестко закодировал ip адрес и порт - мне лень каждый раз набивать их в параметрах запуска, а вы можете сделать по другому;
Требования
- OS Linux с ядром не ниже версии 2.5.66
- GNU gcc
- GNU make
Файлы проекта seChat
Makefile
CC=gcc
# Remove -g -O2 flags after debug
CXXFLAGS=-Wall -g -O2
main: server client tester
clean:
rm -f server client tester
local.h
Заголовочный файл, где определены:
- основные константы;
- макросы для удобства использования программы;
- предварительно объявлены некоторые функции.
Особо примечательного нечего нет, судите сами:
#ifndef _SCHAT_LOCAL_H_ #define _SCHAT_LOCAL_H #include <iostream> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <sys/epoll.h> #include <fcntl.h> #include <errno.h> #include <unistd.h> #include <stdio.h> #include <list> #include <time.h> // Default buffer size #define BUF_SIZE 1024 // Default port #define SERVER_PORT 44444 // seChat server ip, you should change it to your own server ip address #define SERVER_HOST "192.168.34.15" // Default timeout - http://linux.die.net/man/2/epoll_wait #define EPOLL_RUN_TIMEOUT -1 // Count of connections that we are planning to handle (just hint to kernel) #define EPOLL_SIZE 10000 // First welcome message from server #define STR_WELCOME "Welcome to seChat! You ID is: Client #%d" // Format of message population #define STR_MESSAGE "Client #%d>> %s" // Warning message if you alone in server #define STR_NOONE_CONNECTED "Noone connected to server except you!" // Commad to exit #define CMD_EXIT "EXIT" // Macros - exit in any error (eval < 0) case #define CHK(eval) if(eval < 0){perror("eval"); exit(-1);} // Macros - same as above, but save the result(res) of expression(eval) #define CHK2(res, eval) if((res = eval) < 0){perror("eval"); exit(-1);} // Preliminary declaration of functions int setnonblocking(int sockfd); void debug_epoll_event(epoll_event ev); int handle_message(int new_fd); int print_incoming(int fd); #endif
utils.h
Дополнительные утилиты
#include "local.h"
// Debug epoll_event
void debug_epoll_event(epoll_event ev){
printf("fd(%d), ev.events:", ev.data.fd);
if(ev.events & EPOLLIN)
printf(" EPOLLIN ");
if(ev.events & EPOLLOUT)
printf(" EPOLLOUT ");
if(ev.events & EPOLLET)
printf(" EPOLLET ");
if(ev.events & EPOLLPRI)
printf(" EPOLLPRI ");
if(ev.events & EPOLLRDNORM)
printf(" EPOLLRDNORM ");
if(ev.events & EPOLLRDBAND)
printf(" EPOLLRDBAND ");
if(ev.events & EPOLLWRNORM)
printf(" EPOLLRDNORM ");
if(ev.events & EPOLLWRBAND)
printf(" EPOLLWRBAND ");
if(ev.events & EPOLLMSG)
printf(" EPOLLMSG ");
if(ev.events & EPOLLERR)
printf(" EPOLLERR ");
if(ev.events & EPOLLHUP)
printf(" EPOLLHUP ");
if(ev.events & EPOLLONESHOT)
printf(" EPOLLONESHOT ");
printf("\n");
}
// Setup nonblocking socket
int setnonblocking(int sockfd)
{
CHK(fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0)|O_NONBLOCK));
return 0;
}
server.cpp
Кодировать сервер было проще всего. Задача сервера предельна ясна, получать сообщения от клиента и делать массовые рассылки другим( если таковые есть на сервере) или отправить предупреждение отправителю об их отсутствии.
#include "local.h"
#include "utils.h"
using namespace std;
// To store client's socket list
list<int> clients_list;
// for debug mode
int DEBUG_MODE = 0;
int main(int argc, char *argv[])
{
// *** Define debug mode
// any additional parameres on startup
// i.e. like './server f' or './server debug'
// we will switch to switch to debug mode(very simple anmd useful)
if(argc > 1) DEBUG_MODE = 1;
if(DEBUG_MODE){
printf("Debug mode is ON!\n");
printf("MAIN: argc = %d\n", argc);
for(int i=0; i<argc; i++)
printf(" argv[%d] = %s\n", i, argv[i]);
}else printf("Debug mode is OFF!\n");
// *** Define values
// main server listener
int listener;
// define ip & ports for server(addr)
// and incoming client ip & ports(their_addr)
struct sockaddr_in addr, their_addr;
// configure ip & port for listen
addr.sin_family = PF_INET;
addr.sin_port = htons(SERVER_PORT);
addr.sin_addr.s_addr = inet_addr(SERVER_HOST);
// size of address
socklen_t socklen;
socklen = sizeof(struct sockaddr_in);
// event template for epoll_ctl(ev)
// storage array for incoming events from epoll_wait(events)
// and maximum events count could be EPOLL_SIZE
static struct epoll_event ev, events[EPOLL_SIZE];
// watch just incoming(EPOLLIN)
// and Edge Trigged(EPOLLET) events
ev.events = EPOLLIN | EPOLLET;
// chat message buffer
char message[BUF_SIZE];
// epoll descriptor to watch events
int epfd;
// to calculate the execution time of a program
clock_t tStart;
// other values:
// new client descriptor(client)
// to keep the results of different functions(res)
// to keep incoming epoll_wait's events count(epoll_events_count)
int client, res, epoll_events_count;
// *** Setup server listener
// create listener with PF_INET(IPv4) and
// SOCK_STREAM(sequenced, reliable, two-way, connection-based byte stream)
CHK2(listener, socket(PF_INET, SOCK_STREAM, 0));
printf("Main listener(fd=%d) created! \n",listener);
// setup nonblocking socket
setnonblocking(listener);
// bind listener to address(addr)
CHK(bind(listener, (struct sockaddr *)&addr, sizeof(addr)));
printf("Listener binded to: %s\n", SERVER_HOST);
// start to listen connections
CHK(listen(listener, 1));
printf("Start to listen: %s!\n", SERVER_HOST);
// *** Setup epoll
// create epoll descriptor
// and backup store for EPOLL_SIZE of socket events
CHK2(epfd,epoll_create(EPOLL_SIZE));
printf("Epoll(fd=%d) created!\n", epfd);
// set listener to event template
ev.data.fd = listener;
// add listener to epoll
CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, listener, &ev));
printf("Main listener(%d) added to epoll!\n", epfd);
// *** Main cycle(epoll_wait)
while(1)
{
CHK2(epoll_events_count,epoll_wait(epfd, events, EPOLL_SIZE, EPOLL_RUN_TIMEOUT));
if(DEBUG_MODE) printf("Epoll events count: %d\n", epoll_events_count);
// setup tStart time
tStart = clock();
for(int i = 0; i < epoll_events_count ; i++)
{
if(DEBUG_MODE){
printf("events[%d].data.fd = %d\n", i, events[i].data.fd);
debug_epoll_event(events[i]);
}
// EPOLLIN event for listener(new client connection)
if(events[i].data.fd == listener)
{
CHK2(client,accept(listener, (struct sockaddr *) &their_addr, &socklen));
if(DEBUG_MODE) printf("connection from:%s:%d, socket assigned to:%d \n",
inet_ntoa(their_addr.sin_addr),
ntohs(their_addr.sin_port),
client);
// setup nonblocking socket
setnonblocking(client);
// set new client to event template
ev.data.fd = client;
// add new client to epoll
CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, client, &ev));
// save new descriptor to further use
clients_list.push_back(client); // add new connection to list of clients
if(DEBUG_MODE) printf("Add new client(fd = %d) to epoll and now clients_list.size = %d\n",
client,
clients_list.size());
// send initial welcome message to client
bzero(message, BUF_SIZE);
res = sprintf(message, STR_WELCOME, client);
CHK2(res, send(client, message, BUF_SIZE, 0));
}else { // EPOLLIN event for others(new incoming message from client)
CHK2(res,handle_message(events[i].data.fd));
}
}
// print epoll events handling statistics
printf("Statistics: %d events handled at: %.2f second(s)\n",
epoll_events_count,
(double)(clock() - tStart)/CLOCKS_PER_SEC);
}
close(listener);
close(epfd);
return 0;
}
// *** Handle incoming message from clients
int handle_message(int client)
{
// get row message from client(buf)
// and format message to populate(message)
char buf[BUF_SIZE], message[BUF_SIZE];
bzero(buf, BUF_SIZE);
bzero(message, BUF_SIZE);
// to keep different results
int len;
// try to get new raw message from client
if(DEBUG_MODE) printf("Try to read from fd(%d)\n", client);
CHK2(len,recv(client, buf, BUF_SIZE, 0));
// zero size of len mean the client closed connection
if(len == 0){
CHK(close(client));
clients_list.remove(client);
if(DEBUG_MODE) printf("Client with fd: %d closed! And now clients_list.size = %d\n", client, clients_list.size());
// populate message around the world
}else{
if(clients_list.size() == 1) { // this means that noone connected to server except YOU!
CHK(send(client, STR_NOONE_CONNECTED, strlen(STR_NOONE_CONNECTED), 0));
return len;
}
// format message to populate
sprintf(message, STR_MESSAGE, client, buf);
// populate message around the world ;-)...
list<int>::iterator it;
for(it = clients_list.begin(); it != clients_list.end(); it++){
if(*it != client){ // ... except youself of course
CHK(send(*it, message, BUF_SIZE, 0));
if(DEBUG_MODE) printf("Message '%s' send to client with fd(%d) \n", message, *it);
}
}
if(DEBUG_MODE) printf("Client(%d) received message successfully:'%s', a total of %d bytes data...\n",
client,
buf,
len);
}
return len;
}
client.cpp
Главная проблема клиентской части - одновременно следить за новыми сообщения как от пользователя, так и от сервера. И я решил ее созданием двух процессов (родительского и дочернего, через fork) для того что бы:
- дочерний процесс - ожидал ввода сообщения от пользователя;
- родительский процесс - ожидал новых сообщений как от сервера, так и дочернего процесса используя все то же epoll.
Связь между дочерним и родительским процессом осуществляется через pipe (в 'man pipe' есть отличный пример как это сделать).
#include "local.h"
#include "utils.h"
using namespace std;
// chat message buffer
char message[BUF_SIZE];
// for debug mode
int DEBUG_MODE = 0;
/*
We use 'fork' to make two process.
Child process:
- waiting for user's input message;
- and sending all users messages to parent process through pipe.
('man pipe' has good example how to do it)
Parent process:
- wating for incoming messages(EPOLLIN):
-- from server(socket) to display;
-- from child process(pipe) to transmit to server(socket)
*/
int main(int argc, char *argv[])
{
// *** Define debug mode
// any additional parameres on startup
// i.e. like './client f' or './client debug'
// we will switch to debug mode(very simple anmd useful)
if(argc > 1) DEBUG_MODE = 1;
if(DEBUG_MODE){
printf("Debug mode is ON!\n");
printf("MAIN: argc = %d\n", argc);
for(int i=0; i<argc; i++)
printf(" argv[%d] = %s\n", i, argv[i]);
}else printf("Debug mode is OFF!\n");
// *** Define values
// socket connection with server(sock)
// process ID(pid)
// pipe between chils & parent processes(pipe_fd)
// epoll descriptor to watch events
int sock, pid, pipe_fd[2], epfd;
// define ip & ports for server(addr)
struct sockaddr_in addr;
addr.sin_family = PF_INET;
addr.sin_port = htons(SERVER_PORT);
addr.sin_addr.s_addr = inet_addr(SERVER_HOST);
// event template for epoll_ctl(ev)
// storage array for incoming events from epoll_wait(events)
// and maximum events count could be 2
// 'sock' from server and 'pipe' from parent process(user inputs)
static struct epoll_event ev, events[2]; // Socket(in|out) & Pipe(in)
ev.events = EPOLLIN | EPOLLET;
// if it's zero, we should shoud down client
int continue_to_work = 1;
// *** Setup socket connection with server
CHK2(sock,socket(PF_INET, SOCK_STREAM, 0));
CHK(connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0);
// *** Setup pipe to send messages from child process to parent
CHK(pipe(pipe_fd));
if(DEBUG_MODE) printf("Created pipe with pipe_fd[0](read part): %d and pipe_fd[1](write part): % d\n",
pipe_fd[0],
pipe_fd[1]);
// *** Create & configure epoll
CHK2(epfd,epoll_create(EPOLL_SIZE));
if(DEBUG_MODE) printf("Created epoll with fd: %d\n", epfd);
// add server connetion(sock) to epoll to listen incoming messages from server
ev.data.fd = sock;
CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev));
if(DEBUG_MODE) printf("Socket connection (fd = %d) added to epoll\n", sock);
// add read part of pipe(pipe_fd[0]) to epoll
// to listen incoming messages from child process
ev.data.fd = pipe_fd[0];
CHK(epoll_ctl(epfd, EPOLL_CTL_ADD, pipe_fd[0], &ev));
if(DEBUG_MODE) printf("Pipe[0] (read) with fd(%d) added to epoll\n", pipe_fd[0]);
// Fork
CHK2(pid,fork());
switch(pid){
case 0: // child process
close(pipe_fd[0]); // we dont need read pipe anymore
printf("Enter 'exit' to exit\n");
while(continue_to_work){
bzero(&message, BUF_SIZE);
fgets(message, BUF_SIZE, stdin);
// close while cycle for 'exit' command
if(strncasecmp(message, CMD_EXIT, strlen(CMD_EXIT)) == 0){
continue_to_work = 0;
// send user's message to parent process
}else CHK(write(pipe_fd[1], message, strlen(message) - 1));
}
break;
default: //parent process
close(pipe_fd[1]); // we dont need write pipe anymore
// incoming epoll_wait's events count(epoll_events_count)
// results of different functions(res)
int epoll_events_count, res;
// *** Main cycle(epoll_wait)
while(continue_to_work) {
CHK2(epoll_events_count,epoll_wait(epfd, events, 2, EPOLL_RUN_TIMEOUT));
if(DEBUG_MODE) printf("Epoll events count: %d\n", epoll_events_count);
for(int i = 0; i < epoll_events_count ; i++){
bzero(&message, BUF_SIZE);
// EPOLLIN event from server( new message from server)
if(events[i].data.fd == sock){
if(DEBUG_MODE) printf("Server sends new message!\n");
CHK2(res,recv(sock, message, BUF_SIZE, 0));
// zero size of result means the server closed connection
if(res == 0){
if(DEBUG_MODE) printf("Server closed connection: %d\n", sock);
CHK(close(sock));
continue_to_work = 0;
}else printf("%s\n", message);
// EPOLLIN event from child process(user's input message)
}else{
if(DEBUG_MODE) printf("New pipe event!\n");
CHK2(res, read(events[i].data.fd, message, BUF_SIZE));
// zero size of result means the child process going to exit
if(res == 0) continue_to_work = 0; // exit parent to
// send message to server
else{
CHK(send(sock, message, BUF_SIZE, 0));
}
}
}
}
}
if(pid){
if(DEBUG_MODE) printf("Shutting down parent!\n");
close(pipe_fd[0]);
close(sock);
}else{
if(DEBUG_MODE) printf("Shutting down child!\n");
close(pipe_fd[1]);
}
return 0;
}
tester.cpp
Программа для тестирования:
- открывает одновременно EPOLL_SIZE соединений с сервером (в моем случае EPOLL_SIZE = 10000);
- получает по каждому соединению отдельное "приветствие сервера";
- закрывает все соединения;
- и выводит небольшую статистику о своей работе.
Все просто!
#include "local.h"
#include "utils.h"
using namespace std;
// to keep message from server
char message[BUF_SIZE];
// for debuf mode
int DEBUG_MODE = 0;
// to store client's sockets list
list<int> list_of_clients;
// to keep result of different functions
int res;
// to calculate the execution time of a program
clock_t tStart;
int main(int argc, char *argv[])
{
// *** Define debug mode
// any additional parameres on startup
// i.e. like './server f' or './server debug'
// we will switch to switch to debug mode(very simple anmd useful)
if(argc > 1) DEBUG_MODE = 1;
if(DEBUG_MODE){
printf("Debug mode is ON!\n");
printf("MAIN: argc = %d\n", argc);
for(int i=0; i<argc; i++)
printf(" argv[%d] = %s\n", i, argv[i]);
}else printf("Debug mode is OFF!\n");
// *** Define values
// connetion with server
int sock;
// define address & port of server
struct sockaddr_in addr;
addr.sin_family = PF_INET;
addr.sin_port = htons(SERVER_PORT);
addr.sin_addr.s_addr = inet_addr(SERVER_HOST);
// setup start time
tStart = clock();
// create EPOLL_SIZE connections with server
for(int i=0 ; i<EPOLL_SIZE; i++){
// create new socket connection with server
CHK2(sock,socket(PF_INET, SOCK_STREAM, 0));
CHK(connect(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0);
list_of_clients.push_back(sock);
if(DEBUG_MODE) printf("Create new test client with fd: %d\n", sock);
// Get welcome messge from server!
bzero(&message, BUF_SIZE);
CHK2(res,recv(sock, message, BUF_SIZE, 0));
printf("%s\n", message);
}
// close all connections
list<int>::iterator it;
for(it = list_of_clients.begin(); it != list_of_clients.end() ; it++)
close(*it);
// print statistics
printf("Test passed at: %.2f second(s)\n", (double)(clock() - tStart)/CLOCKS_PER_SEC);
printf("Total server connections was: %d\n", EPOLL_SIZE);
return 0;
}
Заключение
Лучше один раз увидеть, чем сто раз услышать - тут картинка с результатом работы программы-теста.
ИТОГИ:
- По результатам тестов на моей машине (виртуальная CentOS 5.2 на Proxmox c оперативкой 1Гб и одним выделенным процессором) сервер обработал около 7000 соединений за четверть секунды. По моему не плохо!
- Для удобства добавил все исходники в Google Code открыв новый проект sechat, тестируйте/пользуйтесь кому интересно.
Приветствуются любая критика и замечания - maksud.nurullaev[DOG]gmail.com
P.S.
- если при запуске теста, система будет ругаться на большое количество одновременно открытых файлов(дескрипторов), проверьте свои лимиты через ulimit -n и измените на подходящее значение;
- тексты программ изобилуют комментариями, но если будут какие то вопросы или пожелания о детализации и переводе, нет проблем, сделаю как только освобожусь в ближайшее время;
- я создал проект ТОЛЬКО ЛИШЬ для тестирования epoll, а не для создания "шедеврального" чата, так что заранее прошу извинить за примитивность кода.
--Maksud 05:17, 8 апреля 2010 (UTC)