Выбрать главу

       bool bNotify = false;

       lock (_workItemQueue) {

             if (_workItemQueue.Count >= 1) {

                 nextWork = (Workltem) _workltemQueue.Peek();

                 bNotify = true;

                  nextWork.SetSignaled();

              }

              else {

                    _locked = false;

              }

         }

          if (bNotify) {

                if (nextWork.IsAsync()) {

                      _asyncWorkEvent.Set ();

                }

                else {

                         ……

                }

        }

}

Критическая секция

lock (workItemQueue) {

……

}

уже была рассмотрена ранее.

Пусть теперь в начале очереди находится асинхронная работа (nextWork). В этом случае событие asyncWorkEvent устанавливается в состояние signaled и на этом вся подготовка к обработке новой работы завершается.

Перехват исходящего вызова

Формирование перехватчика исходящих вызовов

Напомним, что с каждым контекстом может быть связано несколько цепочек перехватчиков. Формирование связанного со свойством синхронизации перехватчика входящих вызовов было рассмотрено в предыдущем разделе. Теперь рассмотрим формирование перехватчика исходящих вызовов.

Класс SynchronizationAttribute реализует интерфейс IContributeClientContextSink.

Благодаря этому факту, при формировании нового контекста синхронизации автоматически вызывается метод GetClientContextSink, объявленный в данном интерфейсе, который и формирует перехватчик исходящих вызовов для данного контекста.

Зачем нужен перехватчик исходящих вызовов? Предположим, контекст (домен) синхронизации реентерабельный. Это означает, что с того момента, когда поток, исполняющий некоторый вызов в данном контексте, инициировал вызов за пределы этого контекста и вошел в состоянии ожидания ответа, очередная работа может быть извлечена из очереди и может начаться выполнение инкапсулированного в ней вызова. Перехватчик исходящих вызовов как раз и замечает момент выдачи внешнего вызова и инициирует обработку очередной работы.

Ниже приводится кодметода GetClientContextSink из Rotor:

public virtual IMessageSink GetClientContextSink (

     IMessageSink nextSink) {

      InitlfNecessary();

      SynchronizedClientContextSink propertySink =

           new SynchronizedClientContextSink (

                  this,

                  nextSink);

       return (IMessageSink) propertySink;

}

Этот код аналогичен коду метода GetServerContextSink, в связи с чем комментарии опущены.

Как и в случае перехватчика входящих вызовов, при одном на весь домен синхронизации свойстве синхронизации, для каждого контекста в этом домене формируется свой перехватчик исходящих вызовов, имеющий ссылку на это свойство синхронизации.

Класс SynchronizedClientContextSink наследует классу InternalSink и реализует интерфейс IMessageSink. Его основная функциональность определяется двумя методами интерфейса IMessageSink: SyncProcessMessage и AsyncProcessMessage, обрабатывающими соответственно синхронные и асинхронные исходящие вызовы.

Перехват исходящих синхронных вызовов

Случай реентерабельного контекста

Начнем со случая реентерабельного контекста (домена). Вот соответствующая ветвь кода метода SyncProcessMessage:

public virtual IMessage SyncProcessMessage(

         IMessage reqMsg) {

         IMessage repiyMsg;

         if (_property.IsReEntrant) {

              _property.HandleThreadExit();

              replyMsg = _nextSink.SyncProcessMessage(reqMsg);

              _property.HandleThreadReEntry();

          }

          else {

                 ……

          }

          return replyMsg;

}

Прежде всего нужно уведомить свойство синхронизации (_property)

_property.HandleThreadExit();

о том, что выполняется вызов за пределы текущего контекста. Это позволит свойству синхронизации инициировать выполнение очередной работы. Рассмотрим код соответствующего метода HandleThreadExit класса SynchronizationAttribute:

internal virtual void HandleThreadExit() {

      HandleWorkCompletion();

}

Код для HandleWorkCompletion уже рассматривался. В результате его выполнения будет проверено состояние очереди работ. Если она не пуста, то очередная работа будет помечена флагом готовности к выполнению. В противном случае домен синхронизации будет разблокирован, что просто означает возможность выполнения вновь поступившего синхронного вызова без записи в очередь. Далее в случае наличия готовой к выполнению работы ее выполнение инициируется. В случае асинхронной работы для этого достаточно перевести событие _asyncWorkEvent в состояние signaled, а в случае синхронной — разбудить занятый ее выполнением процесс путем вызова Monitor.Pulse (nextWork), где nextWork — ссылка на готовую к выполнению синхронную работу.

Теперь вызов reqMsg в форме сообщения пересылается следующему перехватчику в цепочке перехватчиков исходящих вызовов для данного контекста. Текущий поток блокируется в ожидании ответа.

replyMsg = _nextSink.SyncProcessMessage(reqMsg);

После получения ответа на внешний вызов, текущий поток не может безоглядно продолжить выполнение основного вызова, так как в связи с реентерабельностью контекста (домена), возможно, в данном домене уже выполняется какой-либо другой поток. Таким образом, текущий поток должен ожидать своей очереди. Как ему встать в эту очередь? Можно воспользоваться тем, что свойство синхронизации уже поддерживает одну очередь — очередь работ. Можно создать фиктивную работу, включив в нее только информацию о контексте, где этот поток должен выполняться, и о контексте вызова, связанного с этим потоком. Информацию о самом вызове в работу включать не надо, так как этот поток уже находится в состоянии его выполнения. После постановки фиктивной работы в очередь данный поток заснет и будет разбужен только тогда, когда эта фиктивная работа окажется в очереди работ на первом месте.

Вся вышеописанная логика запускается следующим вызовом:

_property.HandleThreadReEntry();

Ниже приводится код для метода HandleThreadReEntry класса

SynchronizationAttribute:

       internal virtual void HandleThreadReEntry() {

                Workltem work = new Workltem(null, null, null);

                work.SetDummy();

                 HandieWorkRequest(work);

        }

Здесь создается фиктивная работа, помечается флагом фиктивности и ставится в очередь в процессе выполнения кода HandleWorkRequest.

Последний метод уже неоднократно обсуждался, но теперь имеет смысл особо рассмотреть ту его ветвь, которая связана с обработкой фиктивной работы. По умолчанию эта работа синхронна. Кроме того, в связи с реентерабельностью контекста, инкапсулированный в ней вызов не вложенный.

internal virtual void HandleWorkRequest(Workitem work) {

       bool bQueued;

       if (!IsNestedCall(work._reqMsg)) {

           if (work.IsAsync()) {

                ……

            }

            else {

                lock(work) {

                           lock(_workltemQueue) {

                               if ((!_locked) &&

                                     (_workltemQueue.Count == 0)) {

                                      _locked = true;

                                      bQueued = false;

                               }

                               else {

                                      bQueued = true;

                                      work.Setwaiting();

                                       _workltemQueue.Enqueue(work);

                                 }