Start - Publikationen - Wissen - TOGAF - Impressum -

Inbound Resource Adapter


Ein Inbound Resource Adapter besteht aus drei öffentlichen Klassen und Interfaces, (nicht öffentliche) Helperklassen und einem Deploymentdeskriptor. Ein Inbound Resource Adapters darf mehrere Message Listener Interfaces definieren (und dafür mehrere ActivationSpec Implementierungen bereitstellen). Der hier vorgestellte Adapter lauscht auf die Anfragen eines Telnet-Clients und implementiert ein triviales Kommunikationsprotokoll mit diesem. Fette Codestellen kennzeichnen notwendige Anpassungen bei eigenen Projekten.

Artefakt Bedeutung
TCPResourceAdapter Die Inbound Resource Adapter Implementierung reagiert auf die Lifecycle Ereignisse des Servers. Das sind insbesondere die Resource Adapter Instanziierung, das Setzen der JavaBean Parameter aus den Angaben im Deploymentdeskriptor, das Starten und Stoppen des Resource Adapters und die Registrierung und Deregistrierung von Message Listener Endpoints.
TCPMessageListener Das Message Listener Interface, die Schnittstelle für Message Listener Endpoints.
TCPActivationSpec Die ActivationSpec Implementierung, eine JavaBean für die Übertragung von Endpoint Konfigurationen an den ResourceAdapter.
ra.xml Der Deploymentdeskriptor des Adapters.

Das Message Listener Interface und die ActivationSpec Implementierung


Das MessageListener Interface definiert die Signatur von onMessage - im Beispiel wird ein String als Parameter erwartet:

/**
 * Die MDBs, die Nachrichten dieses Resource Adapters annehmen, implementieren
 * dieses Interface.
 */
public interface TCPMessageListener {
    void onMessage(String message);
}
Die ActivationSpec Implementierung soll den Adapter bei der Aktivierung mit Enpoint Parametern versorgen. Hier der Einfachheit halber wieder nur ein simpler String als Parameter:
/**
 * Mit einer AktivationSpec Instanz übergibt ein MessageEndpoint seine
 * Konfigurationsparameter bei der Endpoint Registrierung.
 *  
 * Die Implementierung ist JavaBean konform und der Resouerce Adapter DD kann  
 * mit dem required-config-property Element das Setzen entsprechender 
 * Konfigurationsparameter erzwingen - das ist der Weg, mit dem Endpoint 
 * Provider Informationen an den Resource Adapter weiterreichen.
 * In Section 16.4 der Spec ist gefordert, dass die Standard equals Implementierung
 * nicht überschrieben wird (no two Java objects are considerd equals). 
 * 
 * Ein Resource Adapter auch mehrere ActivationSpec Implementierungen für
 * verscheidene Message Listener Typen bereitstellen.
 */
public final class TCPActivationSpec implements ActivationSpec, Serializable {
  //
  private static final long serialVersionUID = -6475040405827259828L;
  private ResourceAdapter resourceAdapter = null;
  private Method messageListenerMethod = null;  
  //
  public ResourceAdapter getResourceAdapter() {
    return this.resourceAdapter;
  }
  //
  public void setResourceAdapter(ResourceAdapter resourceAdapter)
      throws ResourceException {
    this.resourceAdapter = resourceAdapter;
  }
  /**
   * Kann im Rahmen der Endpoint Aktivierung gerufen werden.
   * 
   * Hier kann noch ein letztes Mal der Inhalt der ActivationSpec
   * Parameter geprüft werden. Jedoch: nicht der Server ruft diese Methode,
   * sonder sie ist vom Resource Adapter zu rufen.
   */
  public void validate() throws InvalidPropertyException {
    // hier können zum Beispiel die vom Endpoint Provider übergebenen
    // Parameter geprüft werden..
    if (getMessage() == null || getMessage().trim().length() == 0) {
      throw new InvalidPropertyException("Endpoint Provider must provide a non-empty message!");
    }
  }
/** * Die Bereitstellung des Parameters "Messsage" wird im DD des RA erzwungen * durch das Element required-config-property. */ [*private String message = null; // public String getMessage() { return message; } // public void setMessage(String message) { this.message = message; }*] }

Die Inbound Resource Adapter Implementierung


Der Adapter verwirklicht den Lifecycle Contract zum JEE Server. Entscheidend ist die Implementierung von start, hier wird (in Form von Helperklassen) der Service Thread des Adapters gestartet und dieser läuft bis der Adapter undeployed wird:

/**
 * Inbound ResourceAdapter Komponente.
 * 
 * Sie eine JavaBean (implementiert deshalb Serializable),
 * die config-property Einträge des Deployment Descriptors
 * werden entsprechend an die RA Instanz übergeben.
 *
 * Diese Beispielimplementierung betrachtet Telnet-Clients als EIS
 * und implementiert ein triviales Kommunikationsprotokoll.
 */
public final class TCPResourceAdapter implements ResourceAdapter, Serializable {
  //
  private static final long serialVersionUID = -3660518154888161024L;
  private Work serverWork = null;
  /**
   * Der BootstrapContext kann als Member in der RA Instanz vorgehalten
   * werden - der Context muss für während der gesamten Lebensspanne des
   * RA intakt bleiben.
   */
  private BootstrapContext bootstrapContext = null;
  /**
   * Die config-property Einträge des DD werden hier eingetragen -
   * hier ist Beispielhaft der Port für den ServerSocket konfiguriert.
   */
  private int port = 1234;
  //
  public int getPort() {
    return port;
  }
  //
  public void setPort(int port) {
    this.port = port;
  }
  //
  private final Map<TCPActivationSpec, MessageEndpointFactory> factoryMap = 
    new HashMap<TCPActivationSpec, MessageEndpointFactory>();
  /**
   * Zugriff auf des Servers WorkManager Instanz.
   * 
   * @return Der mit dem BootstrapContext assoziierte WorkManager
   */
  WorkManager getWorkManager() {
    return this.bootstrapContext.getWorkManager();
  }
  /**
   * Der Server meldet dem Resource Adapter einen weiteren Endpunkt.
   * 
   * Das passiert faktisch immer dann, wenn eine MDB deployed wird, die
   * den messaging-type dieses Resource Adapters implementiert.
   * Die MessageEndpointFactory Instanz wird vom Server bereitgestelllt.  
   */
  public void endpointActivation(final MessageEndpointFactory mef,
      final ActivationSpec activationSpec) throws ResourceException {
    if (!(activationSpec instanceof TCPActivationSpec)) {
      throw new ResourceException("invalid activation spec type");
    }
    TCPActivationSpec tcpActivationSpec = (TCPActivationSpec) activationSpec;
    tcpActivationSpec.validate(); // kann eine ResourceException werfen
    synchronized(this.factoryMap) {
      this.factoryMap.put((TCPActivationSpec) activationSpec, mef);
    }
  }
  /**
   * Der Server deaktiviert einen Endpunkt.
   * 
   * Faktisch wurde eine MDB undeployed.
   */
  public void endpointDeactivation(MessageEndpointFactory mef,
      ActivationSpec activationSpec) {
    synchronized(this.factoryMap) {
      this.factoryMap.remove(activationSpec);
    }
  }
  /**
   * Teil des message inflow contracts: gibt ein Array von XAResource
   * Objekten, die anhand des Arrays von ActivationSpec Objekten erstellt
   * wird zurück.
   * 
   * @return Array von XAResource Objekten, null: transaction inflow wird
   * nicht unterstützt
   */
  public XAResource[] getXAResources(ActivationSpec[] activationSpecs)
      throws ResourceException {
    return null;
  }
  /**
   * Wird für MessageEndpoint.beforeDelivery gebraucht.
   *
   * @return Die Methode des MessageEndpoint
   */
  Method getMessageEndpointMethod() {
    return this.messageListenerMethod;
  }
  /**
   * Der Resource Adapter wird vom Server gestartet.
   * 
   * Der BootstrapContext wird gesichert und der Service Thread dieses
   * RA wird gestartet. Der Service Thread repräsentiert einen
   * ServerSocket (so wie in diesem Beispiel), ein DatagramSocket 
   * oder einen Polling Thread. In der Praxis sind diese Details von
   * einer Java API gekapselt. 
   */
  public void start(final BootstrapContext bootstrapContext)
      throws ResourceAdapterInternalException {
    /*
     * Laut Spek muss die BootstrapContext Instanz 
     * während der geamten Lebenszeit des ResourceAdapters
     * intakt beleiben:
     */
    this.bootstrapContext = bootstrapContext;
    /*
     * Wird für MessageEndpoint.beforeDelivery gebraucht.
     */
    try {
      this.messageListenerMethod =
        TCPMessageListener.class.getMethod("onMessage", new Class[] { String.class });
    } catch (SecurityException e) {
      throw new ResourceAdapterInternalException(e);
    } catch (NoSuchMethodException e) {
      throw new ResourceAdapterInternalException(e);
    }
    /*
     * Die eigentliche Arbeit darf aber nicht in diesem Thread durchgeführt
     * werden. Man muss den WorkManager des Servers nutzen.
     * 
     * Hier wird ein einfacher Worker gestartet, möglich sind viele andere
     * Optionen (siehe WorkerManager Interface)
     */
    final WorkManager workManager = bootstrapContext.getWorkManager();
    try {
      // Die Klasse ServerWork repräsentiert den Service einer
      // Java API und wird hier gestartet:
      this.serverWork = new ServerWork(this);
      try {
        workManager.startWork(this.serverWork);
      } catch (WorkException workException) {
        throw new ResourceAdapterInternalException(workException);
      }
    } catch (IOException ioException) {
      throw new ResourceAdapterInternalException(ioException);
    }
  }
  /**
   * Der Resource Adapter wird vom Server gestopt.
   * 
   * Alle Ressourcen werden hier wieder freigegeben.
   */
  public void stop() {
    if (this.serverWork != null) {
      this.serverWork.release();
    }
  }
  @Override
  public int hashCode() {
    final int PRIME = 31;
    int result = 1;
    result = PRIME * result + port;
    return result;
  }
  @Override
  public boolean equals(Object obj) {
    if (this == obj)
      return true;
    if (obj == null)
      return false;
    if (getClass() != obj.getClass())
      return false;
    final TCPResourceAdapter other = (TCPResourceAdapter) obj;
    if (port != other.port)
      return false;
    return true;
  }
  /**
   * Benachrichtigt alle angemeldeten MessageEndpoints
   * mit der gegebenen Nachricht.
   * 
   * Wir müssen an der factoryMap synchronisieren, damit nicht
   * deaktivierte Endpoints benachrichtigt werden. Da nur Nachrichten
   * zugestellt werden sollen wird der betroffene Block aber schnell
   * durchlaufen.
   *  
   * @param message Die Nachricht
   */
  void sendMessage(String message) {
    //
    synchronized(this.factoryMap) {
      for (TCPActivationSpec activationSpec : this.factoryMap.keySet()) {
        //
        MessageEndpointFactory factory = factoryMap.get(activationSpec);
        MessageEndpoint endpoint = null;
        try {
          /*
           * Der Server liefert einen Proxy, der gleichzeitig
           * MessageEndpoint und TCPMessageListener ist:
           */
          endpoint = (MessageEndpoint) factory.createEndpoint(null);
          try {
            // signalisiert den Beginn einer Transaktion
            endpoint.beforeDelivery(getMessageEndpointMethod());
            // versucht die Message zuzustellen, dabei können auch fachspezifisch
            // Informationen der ActivationSpec verwendet werden
            ((TCPMessageListener)endpoint).onMessage(activationSpec.getMessage() + message);
          } catch (NoSuchMethodException e) {
            // ... Fehlerbehandlung hier...
          } catch (ResourceException e) {
            // ... Fehlerbehandlung hier...
          } finally {
            // signalisiert das Ende einer Transaktion
            try {
              endpoint.afterDelivery();
            } catch (ResourceException e) {
              // signalisiert das Ende einer Transaktion
            }
          }
        } catch (UnavailableException e) {
          // kann sein, dass der Endpoint inzwischen deaktiviert wurde
          // ... Fehlerbehandlung hier...
        } finally {
          // der endpoint muss unbedingt wieder freigegeben werden
          if (endpoint != null) {
            endpoint.release();
          }
        }
      }
    }
  }
}

ServerWork repräsentiert den eigentlichen Service Thread, in diesem Beispiel wird ein ServerSocket gestartet und auf Anfragen eines Telnet-Clients gelauscht:

/**
 * Repräsentiert den eigentlichen Service Thread.
 */
final class ServerWork implements Work {
  //
  private final ServerSocket service;
  private final TCPResourceAdapter resourceAdapter;
  /**
   * Der konstruktor ist der korrekte Ort für das Binden aller Ressourcen.
   * 
   * @param workManager
   * @param factoryMap
   * @param port
   * 
   * @throws IOException Wenn der ServerSocket nicht erstellt werden kann...
   */
  ServerWork(final TCPResourceAdapter resourceAdapter) 
      throws IOException {
    this.resourceAdapter = resourceAdapter;
    // Startet einen ServerSocket
    this.service = new ServerSocket(resourceAdapter.getPort());
  }
  //
  public void run() {
    try {
      while (!service.isClosed()) {
        try {
          final Socket socket = service.accept();
          /*
           * Statt die Arbeit hier direkt zu erledigen, werden
           * Work-Einheiten über den work manager gestartet. Damit
           * wird dieser Workerthread nicht zu lange von seiner
           * eigentlichen Aufgabe (das Lauschen auf Ereignissen)
           * abgehalten.
           */
          try {
            if (!service.isClosed() && socket != null && socket.isConnected()) {
              // Wenn ein Request eintrifft, wird sein Inhalt nebenläufig verarbeitet
              Work requestProcessor = new MessageWork(socket, this); 
              resourceAdapter.getWorkManager().scheduleWork(requestProcessor);
            }
          } catch (WorkException e) {
            // well..
          }
        } catch (IOException e) {
          // well..
        }
      }
    } catch (Exception e) {
      // Fehlerbehandlung für alle Fehler außer WorkException oder 
      // IOException in der Hauptschleife 
    } finally {
      // Auf alle Fälle ServerSocket schließen
      release();
    }
  }
  /**
   * Freigabe des Worker Threads.
   * 
   * Wird gerufen, wenn der RA gestoppt wird {@link TCPResourceAdapter()}
   * oder wenn der WorkManager entscheidet, diesen Thread zu stoppen.    
   */
  public void release() {
    if (this.service != null) {
      try {
        // ServerSocket close ist synchronisiert
        this.service.close();
      } catch (IOException e) {
        // well...
      }
    }
  }
  //
  TCPResourceAdapter getResourceAdapter() {
    return this.resourceAdapter;
  }
}
Die eigentliche Abarbeitung der Client-Anfragen erfolgt nebenläufig in MessageWork-Instanzen, die nur für die Dauer einer Client-Session aktiv sind:
/**
 * Eine Work-Implementierung zur Abarbeitung eines einzelenen EIS Client Requests.
 * 
 * Die Implementierung dient Testzwecken und erlaubt einem Telnet Client
 * eine Verbindung. Das Protokoll der Sitzung: es werden alle Eingaben entgegen
 * genommen, bis ein ENTER kommt. Dann wird die Zeile als Nachricht an alle 
 * registrierten Endpoints geschickt und als Echo an den Telnet-Client gesendet.
 * 
 * Die Verbindung und dieser Worker Thread werden abgebrochen, wenn der Telnet-Client
 * "exit" als Zeile eingibt.
 * 
 * In der Praxis würde in dieser Klasse die korrekte Abarbeitung des EIS-Protokolls
 * sowie die Extrahierung von Nachrichten imlpementiert werden. Normalerweise kommt
 * auch hier die Java API des EIS zum Einsatz.
 */
final class MessageWork implements Work {
  //
  private final ServerWork serverWork;
  private final Socket socket;
  //
  MessageWork(final Socket socket, final ServerWork serverWork) {
    this.serverWork = serverWork;
    this.socket = socket;
  }
  //
  public void run() {
    // extrahiere den Inhalt des Requests:
    InputStream inputStream = null;
    OutputStream outputStream = null;
    try {
      // der Client bekommt nun noch eine Antwort:
      inputStream = this.socket.getInputStream();
      outputStream = this.socket.getOutputStream();
      //
      // hier nur das Echo der Anfrage und Weitergabe an die MDB
      StringBuilder sb = new StringBuilder();
      int c = 13;
      boolean exit = false;
      while (!exit && (c = inputStream.read()) != -1) {
        // wir interpretieren 13 (ENTER) als Ende der Eingabe
        sb.append((char) c);
        if (c == 13) {
          String message = sb.toString().trim();
          // exit heißt Verbindung abbrechen!
          if ("exit".equals(message)) {
            exit = true;
          } else {
            outputStream.write(("> " + message + "\n\r").getBytes());
            outputStream.flush();
            // Liefere dem ResourceAdapter die Nachricht, er
            // wird alle MessageEndpoints benachtrichtigen
            this.serverWork.getResourceAdapter().sendMessage(message);
            // Platz für die nächste Nachricht
            sb.setLength(0);
          }
        }
      }
    } catch (IOException e) {
      e.printStackTrace();
    } finally {
      if (outputStream != null) {
        try {
          outputStream.close();
        } catch (IOException e) {
          // well...
        }
      }
      if (inputStream != null) {
        try {
          inputStream.close();
        } catch (IOException e) {
          // well...
        }
      }
      release();
    }
  }
  //
  public void release() {
    if (this.socket != null) {
      try {
        this.socket.close();
      } catch (IOException e) {
        // well..
      }
    }
  }
}

Der Inbound Resource Adapter Deploymentdescriptor


<?xml version="1.0" encoding="UTF-8"?>
<connector xmlns="http://java.sun.com/xml/ns/j2ee"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee
  http://java.sun.com/xml/ns/j2ee/connector_1_5.xsd" version="1.5">
  :
  <display-name>TCP Inbound Adapter</display-name>
  <vendor-name>TCP Solutions, Inc.</vendor-name>
  <eis-type>TCP Request Receiver</eis-type>
  <resourceadapter-version>1.0</resourceadapter-version>
  :
  <resourceadapter>
    <resourceadapter-class>
      com.eis.tcp.TCPResourceAdapter
    </resourceadapter-class>
    :
    <!-- ResourceAdapter default configuration properties -->
    <config-property>
      <config-property-name>Port</config-property-name>
      <config-property-type>java.lang.Integer</config-property-type>
      <config-property-value>6767</config-property-value>
    </config-property>
    :
    <!-- The inbound resource adapter description -->
    <inbound-resourceadapter>
      <messageadapter>
        <messagelistener>
          <messagelistener-type>
            com.eis.tcp.TCPMessageListener
          </messagelistener-type>
          <activationspec>
            <activationspec-class>
              com.eis.tcp.TCPActivationSpec
            </activationspec-class>
            <required-config-property>
              <config-property-name>
                Message
              </config-property-name>
            </required-config-property>
          </activationspec>
        </messagelistener>
      </messageadapter>
    </inbound-resourceadapter>
  </resourceadapter>
</connector>

Der fertig deployte Adapter im Glassfish:

Bereitstellung eines Endpoints


Bis hier wird der deployte Adapter korrekt funktionieren aber keinen Effekt zeigen. Es muss ein MessageEndpoint angemeldet werden. Dazu implementiert eine Message Driven Bean eines der vom Adapter bereitgestellten MessageListener Interfaces. Der Deploymentdescriptor der MDB deklariert dieses MessageListener Interface und stellt die Konfigurationsparameter für die ActivationSpec Implementierung bereit:

ejb-jar.xml:
:
<?xml version="1.0" encoding="UTF-8"?>
<ejb-jar version="2.1" xmlns="http://java.sun.com/xml/ns/j2ee"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee
  http://java.sun.com/xml/ns/j2ee/ejb-jar_2_1.xsd">
  :
  <display-name>TCP Resource Adapter Test MDB</display-name>
  :
  <enterprise-beans>
    <!-- a message driven descriptor -->
    <message-driven>
      <display-name>TCPListenerBean</display-name>
      <ejb-name>TCPListenerBean</ejb-name>
      <ejb-class>com.mdb.TCPListenerBean</ejb-class>
      <!-- The message listener interface -->
      <messaging-type>com.eis.tcp.TCPMessageListener</messaging-type>
      <transaction-type>Container</transaction-type>
      <!-- the values for the Activation Spec JavaBean -->
      <activation-config>
        <activation-config-property>
          <activation-config-property-name>
            Message
          </activation-config-property-name>
          <activation-config-property-value>
            Eine Nachricht über den TCP Stack: 
          </activation-config-property-value>
        </activation-config-property>
      </activation-config>
    </message-driven>
  </enterprise-beans>
  :
  <assembly-descriptor>
    <container-transaction>
      <method>
        <ejb-name>TCPListenerBean</ejb-name>
        <method-name>*</method-name>
      </method>
      <trans-attribute>NotSupported</trans-attribute>
    </container-transaction>
  </assembly-descriptor>
  :
</ejb-jar>
Etwas fehlt noch: laut Spezifikation muss der Deployer noch (serverspezifisch) eine Assoziation zum Adapter deklarieren. In Suns Java System Application Server (GlassFish) geschieht das in der sun-ejb-jar.xml der MDB:
<sun-ejb-jar>
  <enterprise-beans>
    <ejb>
      <ejb-name>TCPListenerBean</ejb-name>
      <mdb-resource-adapter>
        <resource-adapter-mid>tcpInboundRA</resource-adapter-mid>
      </mdb-resource-adapter>
    </ejb>
  </enterprise-beans>
</sun-ejb-jar>
Die fertig deployte MDB im Glassfish:

Test


Für den Test startet man eine Telnet-Session (localhost, Port nach Konfiguration) und tippt Zeilen ein. Jede mit ENTER abgeschlossenen Zeile sollte nun als Echo an zurückkehren und allen Message Endpoints zugestellt werden.

Transaction Inflow Model


Der Transaction Inflow Contract eines Inbound Resource Adapter erlaubt es, den transaktionalen Kontext eingehender Nachrichten an den Container weiterzuleiten. Aus Sicht des EIS ist dann der Java EE Container eine XA-fähige Ressource und beherrscht two phase commit. Für die Implementierung muss der Adapterentwickler die API dieses Kontrakts (siehe dazu JCA Spec, Abschnitt 14.4: Transaction Inflow Model) und das Transaktion-Protokoll des EIS detailliert kennen.

Soll der Adapter Arbeit im Kontext einer vom EIS verantworteten Transaktion ausführen, muss das EIS bei seine Transaktion identifizieren. Diese Identifikation wird dann mittels einer javax.transaction.xa.Xid gekapselt und bei allen die Transaktion betreffenden Aktionen an den Container weitergereicht. Grob sieht dann der Ablauf so aus:

1. Das EIS startet unter dem Dach einer Transaktion (die über gleiche javax.transaction.xa.Xids ) ein oder mehrere Aufgaben in Form von Workern:

// lese Informationen aus dem EIS Aufruf und erzeuge xid und work:
Work work = ...
Xid xid = ...
// führe die Arbeit aus
bootstrapContext.getWorkManager().startWork(work, xid);
2. Das EIS signalisiert das Ende einer Transaktion
Xid xid = ...
// TX Prepare (optional)
try {
  bootstrapContext.getXATerminator().prepare(xid);
} catch(XAException e) {
  // EIS beanchrichtigen: prepare failed
}
3. Das EIS beendet die Transaktion
Xid xid = ...
// TX Commit / Rollback
try {
  if (commit) {
    bootstrapContext.getXATerminator().commit(xid);
  } else {
    bootstrapContext.getXATerminator().rollback(xid);
  }
} catch(XAException e) {
  // EIS beanchrichtigen: commit / roolback failed
}
copyright © 2002-2018 | Dr. Christian Dürr | prozesse-und-systeme.de | all rights reserved