Выбрать главу

DWORD WINAPI consumer(PVOID);

DWORD WINAPI transmitter(PVOID);

DWORD WINAPI receiver(PVOID);

typedef struct _THARG {

 volatile DWORD thread_number;

 volatile DWORD work_goal; /* Используется потоками производителей. */

 volatile DWORD work_done; /* Используется потоками производителей и потребителей. */ '

 char future[8]; 

} THARG;

/* Сгруппированные сообщения, посылаемые передающим потоком потребителю.*/

typedef struct t2r_msg_tag {

 volatile DWORD num_msgs; /* Количество содержащихся сообщений. */

 msg_block_t messages[TBLOCK_SIZE];

} t2r_msg_t;

queue_t p2tq, t2rq, *r2cq_array;

static volatile DWORD ShutDown = 0;

static DWORD EventTimeout = 50;

DWORD _tmain(DWORD argc, LPTSTR * argv[]) {

 DWORD tstatus, nthread, ithread, goal, thid;

 HANDLE *producer_th, *consumer_th, transmitter_th, receiver_th;

 THARG *producer_arg, *consumer_arg;

 nthread = atoi(argv[1]);

 goal = atoi(argv[2]);

 producer_th = malloc(nthread * sizeof(HANDLE));

 producer_arg = calloc(nthread, sizeof(THARG));

 consumer_th = malloc(nthread * sizeof(HANDLE));

 consumer_arg = calloc(nthread, sizeof(THARG));

 q_initialize(&p2tq, sizeof(msg_block_t), P2T_QLEN); 

 q_initialize(&t2rq, sizeof(t2r_msg_t), T2R_QLEN);

 /* Распределить ресурсы, инициализировать очереди "принимающий поток/потребитель" для каждого потребителя. */

 r2cq_array = calloc(nthread, sizeof(queue_t));

 for (ithread = 0; ithread < nthread; ithread++) {

  /* Инициализировать очередь r2с для потока данного потребителя. */

  q_initialize(&r2cq_array[ithread], sizeof(msg_block_t), R2C_QLEN);

  /* Заполнить аргументы потока. */

  consumer_arg[ithread].thread_number = ithread;

  consumer_arg[ithread].work_goal = goal;

  consumer_arg[ithread].work_done = 0;

  consumer_th[ithread] = (HANDLE)_beginthreadex(NULL, 0, consumer, (PVOID)&consumer_arg[ithread], 0, &thid);

  producer_arg[ithread].thread_number = ithread;

  producer_arg[ithread].work_goal = goal;

  producer_arg[ithread].work_done = 0;

  producer_th[ithread] = (HANDLE)_beginthreadex(NULL, 0, producer, (PVOID)&producer_arg[ithread], 0, &thid);

 }

 transraitter_th = (HANDLE)_beginthreadex(NULL, 0, transmitter, NULL, 0, &thid);

 receiver_th = (HANDLE)_beginthreadex (NULL, 0, receiver, NULL, 0, &thid);

 _tprintf(_T("ХОЗЯИН: Выполняются все потоки\n"));

 /* Ждать завершения потоков производителя. */

 for (ithread = 0; ithread < nthread; ithread++) {

  WaitForSingleObject(producer_th[ithread], INFINITE);

  _tprintf(_T("ХОЗЯИН: производитель %d выработал %d единичных сообщений\n"), ithread, producer_arg[ithread].work_done);

 }

 /* Производители завершили работу. */

 _tprintf(_T("ХОЗЯИН: Все потоки производителя выполнили свою работу.\n"));

 /* Ждать завершения потоков потребителя. */

 for (ithread = 0; ithread < nthread; ithread++) {

  WaitForSingleObject(consumer_th[ithread], INFINITE);

  _tprintf(_T("ХОЗЯИН: потребитель %d принял %d одиночных сообщений\n"), ithread, consumer_arg[ithread].work_done);

 }

 _tprintf(_T("ХОЗЯИН: Все потоки потребителя выполнили свою работу.\n"));

 ShutDown = 1; /* Установить флаг завершения работы. */

 /* Завершить выполнение и перейти в состояние ожидания передающих и принимающих потоков. */

 /* Эта процедура завершения работает нормально, поскольку и передающий,*/

 /* и принимающий потоки не владеют иными ресурсами, кроме мьютекса, */

 /* которые они могли бы покинуть по завершении выполнения, не уступив прав владения ими. Можете ли вы улучшить эту процедуру? */ 

 TerminateThread(transmitter_th, 0);

 TerminateThread(receiver_th, 0);

 WaitForSingleObject(transmitter_th, INFINITE);

 WaitForSingleObject(receiver_th, INFINITE);

 q_destroy(&p2tq);

 q_destroy(&t2rq);

 for (ithread = 0; ithread < nthread; ithread++) q_destroy(&r2cq_array [ithread]);

 free(r2cq_array);

 free(producer_th);

 free(consumer_th);

 free(producer_arg);

 free(consumer_arg);

 _tprintf(_T("Система завершила работу. Останов системы\n"));

 return 0;

}

DWORD WINAPI producer(PVOID arg) {

 THARG * parg;

 DWORD ithread, tstatus;

 msg_block_t msg;

 parg = (THARG *)arg;

 ithread = parg->thread_number;

 while (parg->work_done < parg->work_goal) {

  /* Вырабатывать единичные сообщения, пока их общее количество */

  /* не станет равным "goal". */

  /* Сообщения снабжаются адресами отправителя и адресата, которые в */

  /* нашем примере одинаковы для всех сообщений, но в общем случае */

  /* могут быть различными. */

  delay_cpu(DELAY_COUNT * rand() / RAND_MAX);

  message_fill(&msg, ithread, ithread, parg->work_done);

  /* Поместить сообщение в очередь. */

  tstatus = q_put(&p2tq, &msg, sizeof(msg), INFINITE);

  parg->work_done++;

 }

 return 0;

}

DWORD WINAPI transmitter(PVOID arg) {

 /* Получись несколько сообщений от производителя, объединяя их в одно*/

 /* составное сообщение, предназначенное для принимающего потока. */

 DWORD tstatus, im;

 t2r_msg_t t2r_msg = {0};

 msg_block_t p2t_msg;

 while (!ShutDown) {

  t2r_msg.num_msgs = 0;

  /* Упаковать сообщения для передачи принимающему потоку. */

  for (im = 0; im < TBLOCK_SIZE; im++) {

   tstatus = q_get(&p2tq, &p2t_msg, sizeof(p2t_msg), INFINITE); 

   if (tstatus != 0) break;

   memcpy(&t2r_msg.messages[im], &p2t_msg, sizeof(p2t_msg));

   t2r_rasg.num_msgs++;

  }

  tstatus = q_put(&t2rq, &t2r_msg, sizeof(t2r_msg), INFINITE);

  if (tstatus != 0) return tstatus;

 }

 return 0;

}

DWORD WINAPI receiver(PVOID arg) {

 /* Получить составные сообщения от передающего потока; распаковать */