Cilia/Getting Started Tutorial/Developing Scheduler Dispatcher

From Wiki Adele Team
Jump to: navigation, search

Developing a Scheduler and a Dispatcher

In a previous lesson we have learned how to use different kind of schedulers to modify the behavior of mediators. In this lesson we will learn how to create new kinds of schedulers and dispatchers to obtain a special behavior when it does not exist in the Cilia available components.

Chain Description

This chain has a mediator instance that waits for two numbers, when they arrive the processor makes an addition of the two numbers. Then, a routing decision is made based in the result of the addition, if the number is odd the data is dispatched using a “odd” port otherwise it is dispatched by an “even” port. With this example we will show how the scheduler and the dispatcher can change the behavior of a mediator. The synchronization task of the scheduler of the NumberAddition consists in wait for a data in each one of mediator’s ports. In addition, the Dispatcher based in content of the message (odd or even) must decide where send the data.


Tp4-chain.png

We will focus on the NumberAddition mediator since the other mediator type NumberConverter mediator was presented in the previous trail.

Implementating a Cilia Scheduler

In Cilia, a Scheduler implementation class should extend the CiliaScheduler class (provided by the framework). The method notifyData will be invoked by the Cilia framework each time that a Data arrives to a mediator’s port. In this method must be implemented the data synchronization logic. In addition, the scheduler implementation has the responsibility of decide when invoked the method process to indicate to Cilia framework that Data is ready to be processed.

In this example we will implement a Scheduler that waits for a Data in each mediator port, and then triggers the data processing. The class implementing the Scheduler is the WaitAllScheduler class, the code source is as follow:

/**
 * This class implements an scheduler that waits for a data in each one of mediator's ports.
 * 
 */
public class WaitAllScheduler extends CiliaScheduler {

   private Map<String, Data> dataMap = new HashMap<String, Data>();

   @Override
   public synchronized void notifyData(Data data) {

      // Determines the source port of the arriving data
      String currentPort = data.getSource();
      
      dataMap.put(currentPort, data);
      
      boolean dataInAllOthersPorts = true;
      for (String portName : dataMap.keySet()) {
         if (!portName.equals(currentPort)) {
            Data portData = dataMap.get(portName);
            dataInAllOthersPorts = dataInAllOthersPorts && (portData != null);
         }
      }

      // If there is a data in each port of the mediator the process method is invoked.
      if (dataInAllOthersPorts) {
         List<Data> dataToProcess = null;
         dataToProcess = new ArrayList<Data>(dataMap.values());
         initDataMap();
         process(dataToProcess); // Data processing triggering
      }
   }
}

The description of the Scheduler is similar to the description of a processor, the name of the scheduler must be indicated and also the class implementing it, the namespace is an optional attribute. For more details see the Cilia Reference Guide

   <scheduler name="wait-all-scheduler" classname="cilia.examples.scheduler.WaitAllScheduler"
      namespace="fr.liglab.adele.cilia">
   </scheduler>

Implementating a Cilia Dispatcher

In this example we will implement a Dispatcher that observes the data content, if it’s a odd number it send the data by the “odd” port otherwise it send the data by the “even” port. The class implementing the Dispatcher is the NumberDispatcher class, the code source is as follow:

/**
 * Dispatcher implementation based in the data content (odd or even numbers) 
 */
public class NumberDispatcher extends CiliaDispatcher {

   /**
    * Name of port used to send the odd number
    */
   private String oddPort;

   /**
    * Name of port used to send the even number
    */
   private String evenPort;

   @Override
   public void dispatch(List dataset) throws CiliaException {
      if (dataset == null || dataset.size() < 1) {
         log.log(CiliaLogger.WARNING, "Data list is null or empty when dispatching");
         return;
      }
      for (int i = 0; i < dataset.size(); i++) {
         Data data = (Data) dataset.get(i);
         String content = (String) data.getContent();
         long number = Long.parseLong(content);
         if (number % 2 == 0) {
            send(oddPort, data); //Sending data using the odd port
         } else {
            send(evenPort, data); // Sending data using the even port 
         }
      }
   }

}

The description of the Dispatcher is similar to the description of a processor, the name of the scheduler must be indicated and also the class implementing it, the namespace is an optional attribute. For more details see the Cilia Reference Guide.

The ports used to send the numbers in the dispatcher must be indicated to the implementation class. Consequently, two properties have been defined: oddPort and evenPort, these properties will be set when the mediation chain is defined, their default values are "odd" and "even" respectively.

   <dispatcher name="odd-even-dispatcher" classname="cilia.examples.dispatcher.NumberDispatcher"
      namespace="fr.liglab.adele.cilia">
      <properties>
         <property name="oddPort" field="oddPort" value="odd" />
         <property name="evenPort" field="evenPort" value="even" />
      </properties>
   </dispatcher>


Mediator Implementation

Processor Implementation

This processor takes a List of Data (each data contains a long number) and calculates the addition of the elements of the list. The source code is a follow:

/**
 * This processor takes two numbers (long) as parameter and returns the addition of them.
 */
public class NumberAddition {

   /**
    * The process method
    * @param list List containing two numbers.
    * @return a list containing a Data with the result
    */
   public List<Data> process (List<Data> list) {
      long result = 0;
      for (Data data : list) {
         String content = (String)data.getContent();
         Long number = null;
         try {
             number = Long.decode(content);
         } catch (Exception ex) {
             number = Long.valueOf(-1);
         }
         if (number.longValue() < 0) {
            return null;
        }
         result += number;
      }
      // List creation
      List<Data> newList = new ArrayList<Data>();
      newList.add(new Data(Long.toString(result)));
      return newList;
  }

}

The processor description indicates that it takes a java.util.List as parameter.

   <processor name="NumberAdditionProcessor" classname="cilia.examples.mediator.filter.NumberAddition">
      <cilia:method name="process" data.type="java.util.List" />
   </processor>

Mediator Description

The mediator is created using the three elements presented above. The mediator will have a wait-all-scheduler Scheduler and an odd-even-dispatcher Dispatcher. Finally, its processor is the NumberAdditionProcessor. The description is as follow:

   <mediator-component name="NumberAdditionMediator" category="samples">
      <cilia:processor name="NumberAdditionProcessor" />
      <cilia:scheduler name="wait-all-scheduler" />
      <cilia:dispatcher name="odd-even-dispatcher" />
   </mediator-component>

Chain Description

This chain receives the data from the GUI window (using the gui-console binding). Each window is connected to an input port (i1 and i2) of the NumberAddition mediator instance (m-add). Next, the m-add instance is connected to two instances of the NumberConverterMediator i.e. m-nce and m-ncf using its exit ports odd and event respectively. The odd port is bound to the m-nce instance that gets the text representation of a number in English. The even port is bound to the m-ncf instance that gets the text representation of a number in English.

Tp4-chain.png

The description of the chain is as follow:

<cilia>
  <chain id="ConverterChain" type="type1">

    <!-- Adapters instances definition -->
    <adapters>
      <adapter-instance type="gui-adapter" id="entryAdapter-1" />
            <adapter-instance type="gui-adapter" id="entryAdapter-2" />
      <adapter-instance type="console-adapter" id="exitAdapter" />
    </adapters>

    <!-- Mediators instances definition -->
    <mediators>
      <mediator-instance type="NumberAdditionMediator" id="numberAddition">
        <ports>
          <in-port name="in1" />
          <in-port name="in2" />
          <out-port name="odd" />
          <out-port name="even" />
        </ports>
        <property name="oddPort" value="odd" />
        <property name="evenPort" field="even" />
      </mediator-instance>
      <mediator-instance type="NumberConverterMediator" id="fr-numberConverter">
        <ports>
          <in-port name="in" />
          <out-port name="out" />
        </ports>
        <property name="requires.filters">
          <property name="language" value="(language=french)" />
        </property>
      </mediator-instance>
      <mediator-instance type="NumberConverterMediator" id="en-numberConverter">
        <ports>
          <in-port name="in" />
          <out-port name="out" />
        </ports>
        <property name="requires.filters">
          <property name="language" value="(language=english)" />
        </property>
      </mediator-instance>
    </mediators>

    <!-- Bindings definition -->
    <bindings>
      <binding from="entryAdapter-1" to="numberAddition:in1" />
      <binding from="entryAdapter-2" to="numberAddition:in2" />
      <binding from="numberAddition:odd" to="fr-numberConverter:in" />
      <binding from="numberAddition:even" to="en-numberConverter:in" />
      <binding from="fr-numberConverter:out" to="exitAdapter" />
      <binding from="en-numberConverter:out" to="exitAdapter" />
    </bindings>
    
  </chain>

</cilia>

Chain Execution

Deploy the bundle having the dispatcher, shcduler processor, and mediator code and description. Then deploy the chain description tp4.dscilia.

Tp4-execution.png

Example Resources

Source code in the SVN repository

svn checkout svn://svn.ligforge.imag.fr/svnroot/cilia/trunk/examples/cilia-tp4-projects

svn checkout https://svn.ligforge.imag.fr/svnroot/cilia/trunk/examples/cilia-tp4-projects

Browse the SVN code here.

Snapshot of source code

Get a snapshot of the SVN project here.

List of compiled artefacts

Scheduler, dispatcher and processor bundle

Chain File