Оригинальный DVD-ROM: eXeL@B DVD !
eXeL@B ВИДЕОКУРС !

ВИДЕОКУРС ВЗЛОМ
выпущен 8 мая!


УЗНАТЬ БОЛЬШЕ >>
Домой | Статьи | RAR-cтатьи | FAQ | Форум | Скачать | Видеокурс
Новичку | Ссылки | Программирование | Интервью | Архив | Связь


<< ВЕРНУТЬСЯ В ПОДРАЗДЕЛ

<< ВЕРНУТЬСЯ В ОГЛАВЛЕНИЕ




Материалы находятся на сайте https://exelab.ru/pro/




ПРОГРАММИРОВАНИЕ НА C и С++



Возможности языков семейства Си по истине безграничны, однако, в этой свободе кроются и недостатки: всегда нужно программисту держать ухо востро и контроллировать "переполнение буфера", чтобы потом программа не вылетала в "синий экран" на массе разнообразных версий Windows и железа у пользователей. Те же крэкеры и реверсеры специально ищут в коде программ на Си уязвимости, куда можно подсадить любой вирусный код, об этом более подробно автор рассказывал в своём видеокурсе здесь. Я там многое узнал и теперь мой код стал значительно более безопасный.

Сервер TCP/IP... много серверов хороших и разных.

Чаще всего, если это не приходится делать очень часть (т.е. не является основной спецификой работы), при необходимости написания TCP/IP сервера используются одна из двух <классических> технологий: последовательный сервер, или параллельный сервер на основе fork() (Windows-программисты в этом случае пишут сервер на основе thread). Хотя реально можно предложить гораздо больше принципиально различных серверов, которые будут существенно отличаться своей сложностью, временем реакции на запрос клиента и т.д. Ниже описано несколько из таких способов с результатами их тестирования. Программы делались и испытывались в OS QNX 6.2.1, но могут (за исключением специально оговоренного случая) практически без изменений использоваться в любой UNIX-like OS, а за некоторым исключением - и в Windows.

 

1. Постановка задачи: мы напишем специальный тестовый TCP/IP клиент, который посылает требуемое число раз запрос к серверу (ретранслятору), принимает от него ответ, и тут же разрывает соединение. Серия запросов от клиента делается для усреднения результата и для того (как будет видно далее), чтобы исключить (или учесть) эффекты кэширования памяти. Клиент измеряет время (точнее - число циклов процессора) между отправкой запроса серверу и приходом ответа от него. Сервера в этом анализе являются простыми ретрансляторами. Все показанные программы - предложены в упрощённых вариантах: не везде сделана полная обработка ошибочных ситуаций (что, вообще-то говоря, крайне необходимо), и сознательно не включена  обработка сигнала SIGCHLD, которая должна препятствовать появлению <зомби> процессов. Все приводимые коды программ - работающие и апробированные: весь результирующий вывод скопирован непосредственно с консоли задачи. Весь приводимый программный код транслировался компилятором gcc-2-95 в нотации языка C++ (хотя специфические особенности С++, за исключением потокового ввода-вывода С++ и не использованы).

 

2. Клиент. Собственно клиент размещён в файле cli.cpp, но он, совместно с сервером, использует общие файлы common.h & common.cpp, все эти файлы с краткими комментариями приведены ниже:

 

common.h - в этом файле определены различные порты TCP, по которым клиент будет связываться с различными модификациями серверов. Кроме того, здесь определены:

- функция завершения по критической ошибке;

- единая процедура ретрансляции через сокет, которую используют все сервера(для единообразия и корректности сравнений);

- функция подготовки прослушивающего сокета TCP/IP (для того, чтобы устранить этот, достаточно объёмный, код из кода серверов, рассматриваемых ниже).

 

#if !defined( __COMMON_H )

#define __COMMON_H

#include <stdlib.h>

#include <stdio.h>

#include <errno.h>

#include <unistd.h>

#include <string.h>

#include <iostream.h>

#include <netdb.h>

const int PORT = 9000,                      /* программа: */

              SINGLE_PORT = PORT,              /* ech0  */

              FORK_PORT = PORT + 1,            /* ech1  */

              FORK_LARGE_PORT = PORT + 2,      /* ech10 */

              PREFORK_PORT = PORT + 3,         /* ech11 */

              INET_PORT = PORT + 4,            /* ech3  */

              THREAD_PORT = PORT + 5,          /* ech2  */

              THREAD_POOL_PORT = PORT + 6,     /* ech21 */

              PRETHREAD_PORT = PORT + 7;       /* ech22 */

const int MAXLINE = 40;

// критическая ошибка ...

void errx( const char *msg, int err = EOK );

// ретранслятор тестовых пакетов TCP

void retrans( int sc );

// создание и подготовка прослушивающего сокета

int getsocket( in_port_t );

#endif

 common.cpp - реализационная часть:

 

#include "common.h"

// ошибка ...

void errx( const char *msg, int err = EOK ) {

    perror( msg );

    if( err != EOK ) errno = err;

    exit( EXIT_FAILURE );

};

// ретранслятор тестовых пакетов TCP

static char data[ MAXLINE ];

void retrans( int sc ) {

   int rc = read( sc, data, MAXLINE );

   if( rc > 0 ) {

      rc = write( sc, data, strlen( data ) + 1 );

      if ( rc < 0 ) perror( "write data failed" );

   }

   else if( rc < 0 ) { perror( "read data failed" ); return; }

   else if( rc == 0 ) { cout << "client closed connection" << endl; return; };

   return;

};

// создание и подготовка прослушивающего сокета

struct sockaddr_in addr;

int getsocket(  in_port_t p ) {

   int rc = 1, ls;

   if( ( ls = socket( AF_INET, SOCK_STREAM, 0 ) ) = -1 )

      errx( "create stream socket failed" );

   if( setsockopt( ls, SOL_SOCKET, SO_REUSEADDR, &rc, sizeof( rc ) ) != 0 )

      errx( "set socket option failed" );

   memset( &addr, 0, sizeof( addr ) );

   addr.sin_len = sizeof( addr );

   addr.sin_family = AF_INET;

   addr.sin_port = htons( p );

   addr.sin_addr.s_addr = htonl( INADDR_ANY );

   if( bind( ls, (struct sockaddr*)&addr, sizeof( sockaddr ) ) != 0 )

      errx( "bind socket address failed" );

   if( listen( ls, 25 ) != 0 ) errx( "put socket in listen state failed" );

   return ls;

};

 cli.cpp - код клиента:

 #include <inttypes.h>

#include <sys/neutrino.h>

#include <sys/syspage.h>

#include <sys/procfs.h>

#include "common.h"

// установка параметров клиентов: порт и число повторений

static void setkey( int argc, char *argv[], in_port_t* port, int* num ) {

    int opt, val;

    while ( ( opt = getopt( argc, argv, "p:n:") ) != -1 ) {

        switch( opt ) {

            case 'p' :

                if( sscanf( optarg, "%i", &val ) != 1 )

                   errx( "parse command line failed", EINVAL );

                *port = (in_port_t)val;

                break;

            case 'n' :

                if( ( sscanf( optarg, "%i", &val ) != 1 ) || ( val <= 0 ) )

                   errx( "parse command line failed", EINVAL );

                *num = val;

                break;

            default :

                errx( "parse command line failed", EINVAL );

                break;

        };

    };

};

// клиент - источник потока тестовых пакетов TCP

int main( int argc, char *argv[] ) {

   in_port_t listen_port = SINGLE_PORT;

   int num = 10;

   setkey( argc, argv, &listen_port, &num );

   char data[ MAXLINE ], echo[ MAXLINE ];  

   uint64_t cps = cps = SYSPAGE_ENTRY( qtime )->cycles_per_sec;

   cout << "TCP port = " << listen_port << ", number of echoes = " << num << endl

           << "time of reply - Cycles [usec.] :" << endl;

   for( int i = 0; i < num; i++ ) {

      int rc, ls;

      if( ( ls = socket( AF_INET, SOCK_STREAM, 0 ) ) < 0 )

         errx( "create stream socket failed" );

      struct sockaddr_in addr;

      memset( &addr, 0, sizeof( addr ) );

      addr.sin_len = sizeof( addr );

      addr.sin_family = AF_INET;

      addr.sin_port = htons( listen_port );

      inet_aton( "localhost", &addr.sin_addr );

      if( ( rc = connect( ls, (struct sockaddr*)&addr, sizeof( sockaddr ) ) ) < 0 )

         errx( "connect failed" );

      sprintf( data, "%d", rand() );

      uint64_t cycle = ClockCycles();

      if( ( rc = write( ls, data, strlen( data ) + 1 ) ) <= 0 )

         errx( "write data failed" );

      rc = read( ls, echo, MAXLINE );

      cycle = ClockCycles() - cycle;    

      if( rc < 0 ) errx( "read data failed" );

      if( rc == 0 ) errx( "server closed connection" );

      if( strcmp( data, echo ) != 0 ) { cout << "wrong data" << endl; break; };

      cout << cycle << "[" << cycle * 1000000 / cps << "]";

      if( i % 5 == 4 ) cout << endl; else cout << '\t'; cout << flush;

      close( ls );

      delay( 100 );      

   };

   if( num % 5 != 0 ) cout << endl;

   exit( EXIT_SUCCESS );

};

 После запуска клиент анализирует ключи запуска. Предусмотрены значения: <-p> значение порта подключения (по умолчанию - последовательный сервер, порт 9000), и <-n> - число запросов к серверу в серии (по умолчанию - 10). Каждый запрос представляет собой случайное число, генерируемое клиентом, в символьной форме. Ретранслированный сервером ответ сверяется с запросом для дополнительного контроля. Клиент подключается к серверу по петлевому интерфейсу 127.0.0.1, что вполне достаточно для сравнительного анализа. Далее мы рассмотрим его работу с различными серверами.

 

3. Последовательный сервер ретранслятор. Такой сервер нас интересует только как эталон для сравнения: он имеет минимальное время реакции, т.к. не затрачивается время на порождение каких-либо механизмов параллелизма. С другой стороны, такой сервер, зачастую, просто неинтересен, т.к. не позволяет обслуживать других клиентов до завершения текущего обслуживания.

 

Все сервера имеют крайне простой код, потому что большая часть <рутины> снесена в файлы common (h & cpp). Вот код 1-го используемого нами - последовательного сервера (файл ech0.cpp):

 #include "common.h"

int main( int argc, char *argv[] ) {

   int ls = getsocket( SINGLE_PORT ), rs;

   while( true ) {

      if( ( rs = accept( ls, NULL, NULL ) ) < 0 ) errx( "accept error" );

      retrans( rs );

      close( rs );

      cout << "*" << flush;

   };

   exit( EXIT_SUCCESS );

};

 Вот результаты выполнения клиента с этим сервером (указано число машинных циклов ожидания, а в скобках - для справки - время в микросекундах для процессора Celeron 533Mhz):

 

/root/ForkThread # cli -p9000 -n20

TCP port = 9000, number of echoes = 20

time of reply - Cycles [usec.] :

868325[1624]    135364[253]     135287[253]     133438[249]     133057[248]

136061[254]     133554[249]     133887[250]     138776[259]     131237[245]

134748[252]     133823[250]     135650[253]     130583[244]     134562[251]

132601[248]     134622[251]     134516[251]     132055[246]     134139[250]

Отчётливо виден (1-й запрос) эффект, который мы отнесли к эффектам кэширования памяти программ - различие времени выполнения первого и последующих запросов. В каталоге проекта есть ещё один (тестовый) вариант последовательного сервера, код  которого выглядит несколько иначе:

 #include <sys/neutrino.h>

#include "common.h"

int main( int argc, char *argv[] ) {

   int ls = getsocket( SINGLE_PORT ), rs, i = 0;

   while( true ) {

      if( ( rs = accept( ls, NULL, NULL ) ) < 0 ) errx( "accept error" );

      uint64_t cycle = ClockCycles();

      retrans( rs );

      cycle = ClockCycles() - cycle;

      close( rs );

      cout << cycle;

      if( i++ % 5 == 4 ) cout << endl; else cout << '\t'; cout << flush;

   };

   exit( EXIT_SUCCESS );

};

 Он отличается тем, что <хронометрирует> (для справки) оценочно число циклов на ретрансляцию (затрачиваемые внутри сервера). Все типы серверов используют общую процедуру retrans() и единые затраты <чистого времени>. Приведём для справки эти оценки (только машинные циклы):

 /root/ForkThread # ech0_

757808  60862   60085   60444   60197

61111   60565   60154   59121   59984

 Видно, что это время составляет около 50% времени, наблюдаемого со стороны клиента, которое включает в себя время реакции на accept() (со стороны сервера), 2-кратные затраты write() + read() (со стороны как клиента, так и сервера), время передачи буферов по петлевому интерфейсу и т.п.

 4. <Классический> параллельный сервер. Ниже приведен код такого <классического> сервера (в отличающейся части), в котором обслуживающий процесс порождается fork() после разблокирования на accept()(файл ech1.cpp):

 #include "common.h"

int main( int argc, char *argv[] ) {

   int ls = getsocket( FORK_PORT ), rs; 

   while( true ) {

      if( ( rs = accept( ls, NULL, NULL ) ) < 0 ) errx( "accept error" );

      pid_t pid = fork();

      if( pid < 0 ) errx( "fork error" );

      if( pid == 0 ) {

         close( ls );

         retrans( rs );

         close( rs );

         cout << "*" << flush;                

         exit( EXIT_SUCCESS );

      }

      else close( rs );

   };

   exit( EXIT_SUCCESS );

};

 

После выхода из accept() (получение запроса connect() от клиента) - порождается отдельный обслуживающий процесс, который тут же закрывает свою копию прослушивающего сокета, производит ретрансляцию через соединённый сокет, завершает соединение и завершается сам. Родительский же процесс закрывает свою копию соединённого сокета и продолжает прослушивание канала. Вот результаты выполнения такого сервера:

 

/root/ForkThread # cli -p9001 -n20

TCP port = 9001, number of echoes = 20

time of reply - Cycles [usec.] :

2219652[4151]   1467470[2744]   1470056[2749]   1466860[2743]   1469294[2748]

1466875[2743]   1467612[2745]   1489083[2785]   1475620[2759]   1665398[3114]

1472091[2753]   1471635[2752]   1481768[2771]   1462214[2734]   1467229[2744]

1468731[2747]   1466483[2742]   1465499[2741]   1461780[2734]   1649821[3085]

 

Да . . . время реакции больше чем на порядок превышает простой последовательный сервер. Видно заметно меньше (относительно) выраженный эффект кэширования - вновь создаваемое адресное пространство процесса повторно не используется, однако некоторое влияние кэширования сказывается (в программе на стороне клиента?).  Добавим в код сервера 1 строчку - перед точкой main (файл ech10.cpp - и изменён порт):

 

static long MEM[ 2500000 ];

 

/root/ForkThread # cli -p9002

TCP port = 9002, number of echoes = 10

time of reply - Cycles [usec.] :

67061908[125432]   64674322[120966]   64126835[119942]   63071907[117969]   64185096[120051]

65478368[122470]   64495464[120632]   64533852[120703]   63831652[119390]   64407915[120468]

 

Строки вывода перенесены мною, потому, что он уже не помещаются в формат страницы: время реакции увеличилось почти в 50 раз, превышает время реакции простейшего последовательного сервера уже почти на 3 порядка (500 раз, или 1000 раз по <чистому> времени обслуживания), и составляет уже 0.12 секунды на каждый запрос. Что произошло? При порождении нового процесса по fork() (можно считать, что здесь затраты не столь большие - из предыдущей таблицы: порядка 1.5 млн. циклов) - OS обязана перекопировать образ задачи (к которой мы добавили ~20Mb) из адресного пространства одного процесса, а пространство другого. И не посредством memcpy(), а запросами к ядру системы, потому как копирование идёт между различными защищёнными образами!

 

Какие предварительные итоги можно сделать из рассматриваемых результатов? Во-первых, то, что OS QNX определённо не использует технику <copy on write> (COW) для копирования образов порождаемых по fork копий процессов, а, во-вторых, : меняет ли что-то принципиально применение COW в других OS, например в Linux? Думаю, что <скорее нет>, т.к. радикальное снижение начального времени реакции (времени латентности) при использовании COW оборачивается только скрытием тех же затрат, но <распределённых> по интервалу обслуживанию. Т.е., использование COW эффективно только как <рекламный>, <рыночный> трюк, рассчитанный на гипнотическое воздействие на конечного потребителя некоторых <магических> тестовых цифр: и уж категорически неприменимо для realtime OS, поведение которых во времени должно быть строго детерминировано. 

 

5. Параллельный сервер с предварительным созданием копий. Так что же получается: для серверов, работающих на высоко интенсивных потоках запросов, с традиционным fork-методом всё так плохо? Отнюдь! Нужно только поменять fork & accept местами - создать заранее некоторый пул обслуживающих процессов, каждый из которых до прихода клиентского запроса будет заблокирован на accept (кстати - accept на одном и том же прослушиваемом сокете). А после отработки клиентского запроса заблаговременно создать новый обслуживающий процесс. Эта техника известна как <предварительный fork> или pre-fork. Меняем текст сервера (файл ech11.cpp):

 

#include "common.h"

const int NUMPROC = 3;

int main( int argc, char *argv[] ) {

   int ls = getsocket( PREFORK_PORT ), rs;       

   for( int i = 0; i < NUMPROC; i++ ) { 

      if( fork() == 0 ) {

         int rs;       

         while( true ) {

            if( ( rs = accept( ls, NULL, NULL ) ) < 0 ) errx( "accept error" );

            retrans( rs );

            close( rs );

            cout << i << flush;

            delay( 250 );

         };

      };

   };

   for( int i = 0; i < NUMPROC; i++ ) waitpid( 0, NULL, 0 );

   exit( EXIT_SUCCESS );

};

 

При написании этого текста я несколько <схитрил> и упростил в сравнении с предложенной абзацем выше моделью. Здесь 3 обслуживающих процесса сделаны циклическими и не завершаются по окончанию обслуживания, а снова блокируются на accept, но для наблюдения эффектов этого вполне достаточно (последняя строка нужна вообще только для блокировки родительского процесса, и <сохранения> управляющего терминала - для возможности прекращения всей группы по ^C):

 

# pidin

...

6901868   1 ./ech11             10r REPLY       94228

6901869   1 ./ech11             10r REPLY       94228

6901870   1 ./ech11             10r REPLY       94228

 

/root/ForkThread # cli -p9003

TCP port = 9003, number of echoes = 10

time of reply - Cycles [usec.] :

854276[1597]    138356[258]     135665[253]     131656[246]     136653[255]

132532[247]     133583[249]     134639[251]     136363[255]     131482[245]

 

Время реакции - практически равно последовательному серверу, чего мы и добивались. В этой программе добавлен вывод идентификатора (i) обрабатывающего процесса (предыдущие сервера выводили только символ <*> для идентификации факта обработки запроса). Для этого добавлена и задержка <пере-активизации> процесса delay(250) - больше 2-х периодов запросов клиентов, чтоб заставить обрабатывающие процессы чередоваться. Вот возможный вид протокола сервера:

 

/root/ForkThread # 2012012012201201201220120120122012012012

 

Хорошо видно нарушение периодичности последовательности идентификационных номеров

процессов: после периода простоя всегда обслуживание осуществляется процессом с индексом 2 (максимальным) - при множественном блокировании на acept() первым разблокируется процесс, заблокировавшийся последним (!?).

 

В принципе, не так и сложно в такой схеме сделать и динамический пул процессов, как будет показано ниже для потоков - с той лишь некоторой сложностью, что здесь каждый процесс выполняется в своём закрытом адресном пространстве, и для их взаимной синхронизации придётся использовать что-то из механизмов IPC.

 

6. Прежде, чем переходить к потоковым (thread) реализациям, рассмотрим ещё один fork-вариант: использование суперсервера inetd. При этом весь сервис по запуску процессов-копий нашего приложения, и перенаправлению его стандартных потоков ввода-вывода в сокет - возьмёт на себя inetd. Вот полный текст ретранслирующего сервера для этого случая (файл ech3.cpp):

 

#include <stdio.h>

#include "common.h"

static char data[ MAXLINE ];

void main( void ) { write( STDOUT_FILENO, data, read( STDIN_FILENO, data, MAXLINE )  ); };

 

Просто? Мне кажется, что - очень. Теперь настроим на наше приложение и запустим inetd

 

-  Дописываем в конфигурационный файл /etc/services строку, определяющую порт, через который будет вызываться приложение:

ech3            9004/tcp

 

- В конфигурационный файл файл /etc/inetd.conf добавляем строку, которая определяет режим обслуживания и конкретные параметры вызываемого приложения:

ech3    stream  tcp nowait root /root/ForkThread/ech3 ech3

 

- Запускаем inetd:

/root/ForkThread # inetd &

 

При заполнении строк концигурационных файлов нужна особая тщательность, если заголовки сервисов (ech3) в файлах не будут совпадать, то вы получите просто ошибку связи:

/root/ForkThread # cli -p9004

TCP port = 9004, number of echoes = 10

time of reply - Cycles [usec.] :

connect failed: Connection refused

 

Проверить, что inetd настроен на прослушивание нашего порта можно так:

/etc # netstat -a

Active Internet connections (including servers)

Proto Recv-Q Send-Q  Local Address          Foreign Address        State

...

tcp        0      0  *.ech3                 *.*                    LISTEN

 

Заставить inetd перечитать свои конфигурационные файлы после каждой правки /etc/services или /etc/inetd.conf вы можете, послав ему сигнал SGHUP, например:

/etc # pidin

...

10231922   1 usr/sbin/inetd      10r SIGWAITINFO

/etc # kill -SIGHUP 10231922

 

Если ошибка допущена в полном имени программы сервера (поля 6-7 строки inetd.conf), то мы тоже получим не сразу объяснимый результат:

/root/ForkThread # cli -p9004

TCP port = 9004, number of echoes = 10

time of reply - Cycles [usec.] :

server closed connection

 

... и, наконец, если всё в настройке inetd правильно, то получим нечто похожее:

 

/root/ForkThread # cli -p9004

TCP port = 9004, number of echoes = 10

time of reply - Cycles [usec.] :

16442468[30753] 14169659[26502] 14354292[26848] 14160723[26486] 14187182[26535]

14145131[26457] 14411884[26955] 14761467[27609] 14207573[26573] 14491483[27104]

 

Отметим, что время реакции в несколько раз (до 10-ти) выше прямой реализации с fork (inetd ведь также <скрыто> делает fork), но зато какая простота и трудоёмкость! Характерно почти полное отсутствие эффектов кэширования. Для серверов, обслуживающих <неплотный> поток запросов - это, пожалуй, оптимальное решение (кстати, большинство <штатных> сетевых сервисов UNIX выполняется именно по такой схеме).

 

7. Сервер, использующий pthread_create по запросу обслуживания клиента (файл ech2.cpp):

 

#include <pthread.h>

#include "common.h"

void* echo( void* ps ) {

    int sc = *(int*)ps;

    sched_yield();

    retrans( sc );

    close( sc );

    cout << "*" << flush;                    

    return NULL;

}

int main( int argc, char *argv[] ) {

   int ls = getsocket( THREAD_PORT ), rs;          

   while( true ) {

      if( ( rs = accept( ls, NULL, NULL ) ) < 0 ) errx( "accept error" );

      if( pthread_create( NULL, NULL, &echo, &rs ) != EOK ) errx( "thread create error" );

      sched_yield();     

   };

   exit( EXIT_SUCCESS );

};

 

Минимальные комментарии: 2 вызова sched_yield() (в вызывающем потоке, и, позже, в функции обслуживания созданного потока) - предназначены для гарантии копирования созданным потоком переданного дескриптора сокета до его повторного переопределения в цикле вызывающего потока. Результаты выполнения программы:

 

/root/ForkThread # cli -p9005

TCP port = 9005, number of echoes = 10

time of reply - Cycles [usec.] :

2493948[4664]   266123[497]     269490[504]     279049[521]     267775[500]

266880[499]     288175[539]     268589[502]     267990[501]     267003[499]

 

Это только в 2 раза (в 3, если оценивать по <чистому> времени) хуже простого последовательного сервера. Чрезвычайно сильно выражен эффект кэширования - вся обработка последовательности запросов производится на едином (многократно используемом) пространстве адресов.

 

8. Сервер с предварительным созданием потоков. Поступим по аналогии с pre-fork, и создадим фиксированный пул потоков предварительно (pre-thread, файл ech22.cpp):

 

#include <pthread.h>

#include "common.h"

static int ntr = 3;     /*число thread в пуле*/

static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

static pthread_cond_t condvar = PTHREAD_COND_INITIALIZER;

void* echo( void* ps ) {

    int sc = *(int*)ps, rs;

    sched_yield();

    if( ( rs = accept( sc, NULL, NULL ) ) < 0 ) errx( "accept error" );

    retrans( rs );

    close( rs );

    pthread_mutex_lock( &mutex );

    ntr++;

    pthread_cond_signal( &condvar );

    pthread_mutex_unlock( &mutex );

    cout << pthread_self() << flush;

    delay( 250 );

    return NULL;

}

int main( int argc, char *argv[] ) {

   int ls = getsocket( PRETHREAD_PORT ), rs;             

   while( true ) {

      if( pthread_create( NULL, NULL, &echo, &ls ) != EOK ) errx( "thread create error" );

      sched_yield();     

      pthread_mutex_lock( &mutex );

      ntr--;

      while( ntr <= 0 ) pthread_cond_wait( &condvar, &mutex );

      pthread_mutex_unlock( &mutex );

   };

   exit( EXIT_SUCCESS );

};

 

Здесь accept (как и раньше в случае prefork) перенесен в обрабатывающий поток (все thread блокированы в accept на единственном прослушивающем сокете). Для синхронизации я использую условную переменную, но могут применятся любые из синхронизирующих примитивов. Испытываем полученную программу:

 

/root/ForkThread # cli -p9007

TCP port = 9007, number of echoes = 10

time of reply - Cycles [usec.] :

879988[1645]    134687[251]     137152[256]     136303[254]     693676[1297]

138605[259]     140320[262]     138937[259]     136886[256]     342027[639]

 

Время реакции очень близко к последовательному серверу (к минимально достижимому потенциально!). Потоки обработчики на сервере идентифицируют себя своим tid:

 

/root/ForkThread # ech22

4567891011121314151617181920212223

 

Хорошо видно последовательное порождение нового потока для обработки каждого запроса клиента. Так же сильно, как и в предыдущем случае, выражены эффекты кэширования.

 

9. Можно создать сколь угодно сложный диспетчер, поддерживающий оптимальное число потоков (или процессов) в сервере, но в OS QNX от уже предоставлен как стандартное средство системы: потоковый пул (thread_pool_*). Сервер с использованием динамического пула потоков (файл ech21.cpp):

 

#include <pthread.h>

#include <sys/dispatch.h>

#include "common.h"

static int ls;

THREAD_POOL_PARAM_T *alloc( THREAD_POOL_HANDLE_T *h ) { return (THREAD_POOL_PARAM_T*)h; };

THREAD_POOL_PARAM_T *block( THREAD_POOL_PARAM_T *p ) {

    int rs = accept( ls, NULL, NULL );

    if( rs < 0 ) errx( "accept error" );

    return (THREAD_POOL_PARAM_T*)rs;

};

int handler( THREAD_POOL_PARAM_T *p ) {

    retrans( (int)p );

    close( (int)p );

    delay( 250 );

    cout << pthread_self() << flush;                    

    return 0;

};

int main( int argc, char *argv[] ) {

   ls = getsocket( THREAD_POOL_PORT );                 

   thread_pool_attr_t attr;

   memset( &attr, 0, sizeof( thread_pool_attr_t ) );

   attr.lo_water = 3;     /* заполнение блока атрибутов пула */

   attr.hi_water = 7;

   attr.increment = 2;

   attr.maximum = 9;

   attr.handle = dispatch_create();

   attr.context_alloc = alloc;

   attr.block_func = block;

   attr.handler_func = handler;

   void *tpp = thread_pool_create( &attr, POOL_FLAG_USE_SELF ) ;

   if(  tpp == NULL ) errx( "create pool" );

   thread_pool_start( tpp );

   exit( EXIT_SUCCESS );

};

 

Всё, сервер готов - почти всё необходимое за нас сделала библиотека OS. Грубо, логика работы пула потоков QNX следующая:

- начально создаётся attr.lo_water (<нижняя ватерлиния>) потоков;

- для каждого потока при создании вызывается функция *attr.context_alloc;

- эта функция по завершению вызовет (сама) блокирующую функцию потока *attr.block_func;

- эта функция, после разблокирования (accept) вызовет функцию обработчика *attr.handler_func, которой в качестве параметра (в нашем тексте) передаст дескриптор присоединённого сокета;

- как только число заблокированных потоков станет ниже attr.lo_water - механизм пула создаст дополнительно attr.increment потоков;

- если число блокированных потоков в какой-то момент превысит attr.hi_water (<верхняя ватерлиния>) - <лишние> потоки будут уничтожены;

- . . . и всё это так, чтобы общее число потоков (выполняющиеся + блокированные) не превышало attr.maximum.

 

Это - уникально мощный механизм, с очень широкой функциональностью, но за более детальной информацией я отсылаю всех заинтересованных к технической документации OS QNX. Смотрим это в действии:

 

/root/ForkThread # cli -p9006 -n20

TCP port = 9006, number of echoes = 20

time of reply - Cycles [usec.] :

828384[1549]    139615[261]     142050[265]     144799[270]     143895[269]

146760[274]     142760[267]     145951[272]     142816[267]     144384[270]

144657[270]     159474[298]     147504[275]     147113[275]     145257[271]

146866[274]     153215[286]     145461[272]     145013[271]     145311[271]

 

Результаты очень близкие к максимально возможным! Так же, как и в предыдущих случаях - очень ярко выражен эффект кэширования: вся обработка ведётся на одном и том же, многократно используемом, адресном пространстве.

 

Посмотрим <чередование> tid обрабатывающих потоков:

 

/root/ForkThread # ech21

15315315311731731731731731731776176176176176176176

 

Хорошо видно, что через некоторое время работы число потоков в пуле стабилизируется на уровне 7-ми (<верхней ватерлинии>). Через какое-то время выполнения состояние пула будет примерно таким:

 

/ # pidin

...

10059820   1 ./ech21             10r REPLY       94228

10059820   2 ./ech21             10r REPLY       94228

10059820   3 ./ech21             10r REPLY       94228

10059820   4 ./ech21             10r REPLY       94228

10059820   5 ./ech21             10r REPLY       94228

10059820   6 ./ech21             10r REPLY       94228

10059820   7 ./ech21             10r REPLY       94228

...

 

Как и предсказывает документация - мы имеем 7 блокированных на accept потоков - по <верхнюю ватерлинию>.

 

Достаточно интересно посмотреть состояния ожидающих сокетов при запущенных всех (или почти всех) видах описанных выше серверов - вот возможное начало таблицы netstat:

 

/root/ForkThread # netstat -a

Active Internet connections (including servers)

Proto Recv-Q Send-Q  Local Address          Foreign Address        State

tcp        0      0  *.9005                 *.*                    LISTEN

tcp        0      0  *.9003                 *.*                    LISTEN

tcp        0      0  *.9006                 *.*                    LISTEN

tcp        0      0  *.9002                 *.*                    LISTEN

tcp        0      0  *.9001                 *.*                    LISTEN

tcp        0      0  *.9000                 *.*                    LISTEN

tcp        0      0  *.ech3                 *.*                    LISTEN

 

10. Итоги. Выше рассмотрено 7 различных альтернативных технологий построения сервера TCP/IP. Сравним средние характеристики вариантов по критерию <время задержки реакции> (представляют интерес только порядки величин, сами значения могут радикально <гулять> в зависимости от конкретного вида серверной функции):

 

Тип сервера

Среднее время обслуживания

Время латентности

Последовательный - п.3

135 000

0

fork - п.4

>>1 470 000

>>1 000 000

pre-fork - п.5

133 000

0

inetd - п.6

14 100 000

14 000 000

thread - п.7

267 000

130 000

pre-thread - п.8

140 000

5 000 (~0)

thread pool - п.9

144 000

9 000 (~0)

 

Тем не менее, не следует категорически руководствоваться выбором той или иной технологии построения сервера только исходя из содержимого показанной выше таблицы. В каждом конкретном случае при выборе решения должно учитываться существенно больше факторов: трудоёмкость реализации, потребление ресурсов, в частности RAM (которое мы никак не затрагиваем в нашем рассмотрении), простота отладки и сопровождения etc.

 

P.S. 1. Все упоминаемые в тексте элементы программного кода, или необходимые для их сборки элементы (Makefile) содержаться в составе прилагаемого проекта echsrv.tgz.

 

2. Материал данного рассмотрения непосредственно произошёл от обсуждения подобных вопросов на форуме http://qnx.org.ru/forum  тема <fork или thread> : т.е., в первую очередь, автор приносит свои благодарности всем, принявшим участие в обсуждении. Во-вторых - все обсуждавшие данную тему в форуме, являются соавторами предлагаемого материала в той же мере, как и автор, указанный в титуле статьи, а их полное поимённое перечисление : я опускаю только в силу их многочисленности.

Источник: http://codenet.ru

Оригинальный DVD-ROM: eXeL@B DVD !


Вы находитесь на EXELAB.rU
Проект ReactOS