Выбрать главу

95     goto pthreaderr:

96    if ((i = pthread_condattr_init(&cattr)) != 0)

97     goto pthreaderr;

98    pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);

99    i = pthread_cond_init(&mqhdr->mqh_wait, &cattr);

100   pthread_condattr_destroy(&cattr); /* обязательно нужно удалить */

101   if (i != 0)

102    goto pthreaderr;

103   /* инициализация завершена, снимаем бит user-execute */

104   if (fchmod(fd, mode) == –1)

105    goto err;

106   close(fd);

107   return((mymqd_t) mqinfo);

108  }

Установка размера файла

51-58 Вычисляется размер сообщения, который затем округляется до кратного размеру длинного целого. Также в файле отводится место для структуры mq_hdr в начале файла и msghdr в начале каждого сообщения (рис. 5.2). Размер вновь созданного файла устанавливается функцией lseek и записью одного байта со значением 0. Проще было бы вызвать ftruncate (раздел 13.3), но у нас нет гарантий, что это сработало бы для увеличения размера файла.

Отображение файла в память

59-63 Файл отображается в память функцией mmap.

Выделение памяти под структуру mq_info

64-66 При каждом вызове mq_open создается отдельный экземпляр mq_infо. Эта структура после создания инициализируется.

Инициализация структуры mq_hdr

67-87 Инициализируется структура mq_hdr. Заголовок связного списка сообщений (mqh_head) инициализируется нулем, а все сообщения в очереди добавляются к списку свободных (mqh_frее).

Инициализация взаимного исключения и условной переменной

88-102 Поскольку очереди сообщений Posix могут использоваться совместно произвольным количеством процессов, которые знают имя очереди и имеют соответствующие разрешения, нам нужно инициализировать взаимное исключение и условную переменную с атрибутом PTHREAD_PROCESS_SHARED. Для этого мы сначала инициализируем атрибуты вызовом pthread_mutexattr_init, а затем устанавливаем значение атрибута совместного использования процессами, вызвав pthread_mutexattr_setpshared. После этого взаимное исключение инициализируется вызовом pthread_mutex_init. Аналогичные действия выполняются для условной переменной. Мы должны аккуратно удалить взаимное исключение и условную переменную даже при возникновении ошибки, поскольку вызовы pthread_ mutexattr_init и pthread_condattr_init выделяют под них память (упражнение 7.3).

Сброс бита user-execute

103-107 После инициализации очереди сообщений мы сбрасываем бит user-execute. Это говорит другим процессам о том, что очередь была проинициализирована. Мы также закрываем файл вызовом close, поскольку он был успешно отображен в память и держать его открытым больше нет необходимости.

В листинге 5.19 приведен конец функции mq_open, в котором осуществляется открытие существующей очереди сообщений.

Листинг 5.19. Третья часть функции mq_open: открытие существующей очереди сообщений

//my_pxmsg_mmap/mq_open.с

109 exists:

110  /* открытие файла и отображение его в память */

111  if ((fd = open(pathname, O_RDWR)) < 0) {

112   if (errno == ENOENT && (oflag & O_CREAT))

113    goto again;

114  goto err;

115  }

116  /* проверяем, что инициализация завершена */

117  for (i = 0; i < MAX TRIES; i++) {

118   if (stat(pathname, &statbuff) == –1) {

119    if (errno == ENOENT && (oflag & O_CREAT)) {

120     close(fd);

121     goto again;

122    }

123    goto err;

124   }

125   if ((statbuff.st_mode & S_IXUSR) == 0)

126   break;

127   sleep(1);

128  }

129  if (i == MAX_TRIES) {

130   errno = ETIMEDOUT;

131   goto err;

132  }

133  filesize = statbuff.st_size;

134  mptr = mmap(NULL, filesize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);

135  if (mptr == MAP_FAILED)

136   goto err;

137  close(fd);

138  /* выделяем одну mymq_info{} для каждого вызова open */

139  if ((mqinfo = malloc(sizeof(struct mymq_info))) == NULL)

140   goto err;

141  rnqinfo->mqi_hdr = (struct mymq_hdr *) mptr;

142  mqinfo->mqi_magic = MQI_MAGIC;

143  mqinfo->mqi_flags = nonblock;

144  return((mymqd_t) mqinfo);

145 pthreaderr:

146  errno = i;

147 err:

148  /* не даем следующим вызовам изменить errno */

149  save_errno = errno;

150  if (created)

151   unlink(pathname);

152  if (mptr != MAP_FAILED)

153   munmap(mptr, filesize);

154  if (mqinfo != NULL)

155   free(mqinfo);

156  close(fd);

157  errno = save_errno;

158  return((mymqd_t) –1);

159 }

Открытие существующей очереди сообщений

109-115 Здесь мы завершаем работу, если флаг O_CREAT не был указан или если он был указан, но очередь уже существовала. В любом случае, мы открываем существующую очередь сообщений. Для этого мы открываем для чтения и записи файл, в котором она содержится, функцией open и отображаем его содержимое в адресное пpocтрaнcтвo процесса (mmap).

ПРИМЕЧАНИЕ

Наша реализация сильно упрощена в том, что касается режима открытия файла. Даже если вызвавший процесс указывает флаг O_RDONLY, мы должны дать возможность доступа для чтения и записи при открытии файла командой open и при отображении его в память командой mmap, поскольку невозможно считать сообщение из очереди, не изменив содержимое файла. Аналогично невозможно записать сообщение в очередь, не имея доступа на чтение. Обойти эту проблему можно, сохранив режим открытия (O_RDONLY, O_WRONLY, O_RDWR) в структуре mq_info и проверяя этот режим в каждой из функций. Например, mq_receive должна возвращать ошибку, если в mq_info хранится значение O_WRONLY.

Проверка готовности очереди

116-132 Нам необходимо дождаться, когда очередь будет проинициализирована (в случае, если несколько потоков сделают попытку открыть ее приблизительно одновременно). Для этого мы вызываем stat и проверяем разрешения доступа к файлу (поле st_mode структуры stat). Если бит user-execute сброшен, очередь уже проинициализирована.

Этот участок кода обрабатывает другую возможную ситуацию гонок. Предположим, что два потока разных процессов попытаются открыть очередь приблизительно одновременно. Первый поток создает файл и блокируется при вызове lseek в листинге 5.18. Второй поток обнаруживает, что файл уже существует, и переходит к метке exists, где он вновь открывает файл функцией open и при этом блокируется. Затем продолжается выполнение первого потока, но его вызов mmap в листинге 5.18 не срабатывает (возможно, он превысил лимит использования памяти), поэтому он переходит на метку err и удаляет созданный файл вызовом unlink. Продолжается выполнение второго потока, но если бы мы вызывали fstat вместо stat, он бы вышел по тайм-ауту в цикле for, ожидая инициализации файла. Вместо этого мы вызываем stat, которая возвращает ошибку, если файл не существует, и, если флаг O_CREAT был указан при вызове mq_open, мы переходим на метку again (листинг 5.17) для повторного создания файла. Эта ситуация гонок заставляет нас также проверять, не возвращается ли при вызове open ошибка ENOENT.