Выбрать главу
Ожидание установки флага обработчиком

28-32 Мы вызываем sigprocmask, чтобы заблокировать SIGUSR1, сохраняя текущую маску сигналов в oldmask. Затем мы в цикле проверяем значение глобального флага mqflag, ожидая, когда обработчик сигнала установит его в ненулевое значение. Пока значение этого флага равно нулю, мы вызываем sigsuspend, что автоматически приостанавливает вызывающий поток и устанавливает его маску в zeromask (сигналы не блокируются). Раздел 10.16 [21] рассказывает о функции sigsuspend более подробно. Также там объясняются причины, по которым мы должны проверять значение переменной mqflag только при заблокированном сигнале SIGUSR1. Каждый раз при выходе из sigsuspend сигнал SIGUSR1 блокируется.

Перерегистрация и считывание сообщения

33-36 Когда флаг mqflag принимает ненулевое значение, мы регистрируемся на получение уведомления заново и считываем сообщение из очереди. Затем мы разблокируем сигнал SIGUSR1 и возвращаемся к началу цикла.

Мы уже говорили, что в этой версии программы также присутствует ошибка. Посмотрим, что произойдет, если в очередь попадут два сообщения, прежде чем будет считано первое из них. Мы можем имитировать это, добавив sleep перед вызовом mq_notify. Проблема тут в том, что уведомление отсылается только в том случае, когда сообщение помещается в пустую очередь. Если в очередь поступают два сообщения, прежде чем первое будет считано, то отсылается только одно уведомление. Тогда мы считываем первое сообщение и вызываем sigsuspend, ожидая поступления еще одного. А в это время в очереди уже имеется сообщение, которое мы должны прочитать, но которое мы никогда не прочтем.

Пример: уведомление сигналом с отключением блокировки

Исправить описанную выше ошибку можно, отключив блокировку операции считывания сообщений. Листинг 5.10 содержит измененную версию программы из листинга 5.9. Новая программа считывает сообщения в неблокируемом режиме.

Листинг 5.10. Использование уведомления с помощью сигнала для считывания сообщения из очереди сообщений Posix

//pxmsg/mqnotifysig3.с

1  #include "unpipc.h"

2  volatile sig_atomic_t mqflag; /* ненулевое значение устанавливается обработчиком сигнала */

3  static void sig_usr1(int);

4  int

5  main(int argc, char **argv)

6  {

7   mqd_t mqd;

8   void *buff;

9   ssize_t n;

10  sigset_t zeromask, newmask, oldmask;

11  struct mq_attr attr;

12  struct sigevent sigev;

13  if (argc != 2)

14   err_quit("usage: mqnotifysig3 <name>");

15  /* открытие очереди, получение атрибутов, выделение буфера */

16  mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);

17  Mq_getattr(mqd, &attr);

18  buff = Malloc(attr.mq_msgsize);

19  Sigemptyset(&zeromask); /* сигналы не блокируются */

20  Sigemptyset(&newmask);

21  Sigemptyset(&oldmask);

22  Sigaddset(&newmask, SIGUSR1);

23  /* установка обработчика, включение уведомления */

24  Signal(SIGUSR1, sig_usr1);

25  sigev.sigev_notify = SIGEV_SIGNAL;

26  sigev.sigev_signo = SIGUSR1;

27  Mq_notify(mqd, &sigev);

28  for (;;) {

29   Sigprocmask(SIG_BLOCK, &newmask, &oldmask); /* блокируем SIGUSR1 */

30   while (mqflag == 0)

31    sigsuspend(&zeromask);

32   mqflag = 0; /* сброс флага */

33   Mq_notify(mqd, &sigev); /* перерегистрируемся */

34   while ((n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {

35    printf("read $ld bytes\n", (long) n);

36   }

37   if (errno != EAGAIN)

38    err_sys("mq_receive error");

39   Sigprocmask(SIG_UNBLOCK, &newmask, NULL); /* разблокируем SIGUSR1 */

40  }

41  exit(0);

42 }

43 static void

44 sig_usr1(int signo)

45 {

46  mqflag = 1;

47  return;

48 }

Открытие очереди сообщений в режиме отключенной блокировки

15-18 Первое изменение в программе: при открытии очереди сообщений указывается флаг O_NONBLOCK.

Считывание всех сообщений из очереди

34-38 Другое изменение: mq_receive вызывается в цикле, считывая все сообщения в очереди, пока не будет возвращена ошибка с кодом EAGAIN, означающая отсутствие сообщений в очереди. 

Пример: уведомление с использованием sigwait вместо обработчика

Хотя программа из предыдущего примера работает правильно, можно повысить ее эффективность. Программа использует sigsuspend для блокировки в ожидании прихода сообщения. При помещении сообщения в пустую очередь вызывается сигнал, основной поток останавливается, запускается обработчик, который устанавливает флаг mqflag, затем снова запускается главный поток, он обнаруживает, что значение mqflag отлично от нуля, и считывает сообщение. Более простой и эффективный подход заключается в блокировании в функции, ожидающей получения сигнала, что не требует вызова обработчика только для установки флага. Эта возможность предоставляется функцией sigwait:

#include <signal.h>

int sigwait(const sigset_t *set, int *sig);

/* Возвращает 0 в случае успешного завершения, –1 – в случае ошибки */

Перед вызовом sigwait мы блокируем некоторые сигналы. Набор блокируемых сигналов указывается в качестве аргумента set. Функция sigwait блокируется, пока не придет по крайней мере один из этих сигналов. Когда он будет получен, функция возвратит его. Значение этого сигнала сохраняется в указателе sig, а функция возвращает значение 0. Это называется синхронным ожиданием асинхронного события: мы используем сигнал, но не пользуемся асинхронным обработчиком сигнала.

В листинге 5.11 приведен текст программы, использующей mq_notifу и sigwait.

Листинг 5.11. Использование mq_notify совместно с sigwait

//pxmsg/mqnotifysig4.c

1  #include "unpipc.h"

2  int

3  main(int argc, char **argv)

4  {

5   int signo;

6   mqd_t mqd;

7   void *buff;

8   ssize_t n;

9   sigset_t newmask;

10  struct mq_attr attr;

11  struct sigevent sigev;

12  if (argc != 2)

13   err_quit("usage: mqnotifysig4 <name>");

14  /* открытие очереди, получение атрибутов, выделение буфера */

15  mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);

16  Mq_getattr(mqd, &attr);

17  buff = Malloc(attr.mq_msgsize);