ONJava.com -- The Independent Source for Enterprise Java
oreilly.comSafari Books Online.Conferences.

advertisement

AddThis Social Bookmark Button

Developing a Simple JMS Example
Pages: 1, 2, 3, 4

The TopicConnection

The TopicConnection is created by the TopicConnectionFactory:

// Look up a JMS connection factory
TopicConnectionFactory conFactory = 
(TopicConnectionFactory)jndi.lookup("TopicConnectionFactory");
                
// Create a JMS connection
TopicConnection connection = 
conFactory.createTopicConnection(username, password);

The actual physical network connection may or may not be unique depending on the vendor. However, the connection is considered to be logically unique so authentication and connection control can be managed separately from other connections.

The TopicConnection represents a connection to the message server. Each TopicConnection that is created from a TopicConnectionFactory is a unique connection to the server. A JMS client might choose to create multiple connections from the same connection factory, but this is rare as connections are relatively expensive (each connection requires a network socket, I/O streams, memory, etc.). Creating multiple Session objects (discussed later in this chapter) from the same connection is considered more efficient, because sessions share access to the same connection. The TopicConnection is an interface that extends javax.jms.Connection interface. It defines several general-purpose methods used by clients of the TopicConnection. Among these methods are the start( ), stop( ), and close( ) methods:



// javax.jms.Connection the super interface
public interface Connection {
    public void start( ) throws JMSException; 
    public void stop( ) throws JMSException; 
    public void close( ) throws JMSException;
    ...
}

// javax.jms.TopicConnection extends javax.jms.Connection
public interface TopicConnection extends Connection {
    public TopicSession createTopicSession(boolean transacted, 
                                           int acknowledgeMode) 
    throws JMSException;
    ...
}

The start( ), stop( ), and close( ) methods allow a client to manage the connection directly. The start( ) method turns the inbound flow of messages "on," allowing messages to be received by the client. This method is used at the end of the constructor in Chat class:

   ...
   // Intialize the Chat application
   set(connection, pubSession, subSession, publisher, username);

   <kbd class=userinput>connection.start( );</kbd>
        
}

It is a good idea to start the connection after the subscribers have been set up, because the messages start to flow in from the topic as soon as start( ) is invoked.

The stop( ) method blocks the flow of inbound messages until the start( ) method is invoked again. The close( ) method is used to close the TopicConnection to the message server. This should be done when a client is finished using the TopicConnection; closing the connection conserves resources on the client and server. In the Chat class, the main( ) method calls Chat.close( ) when "exit" is typed at the command line. The Chat.close( ) method in turn calls the TopicConnection.close( ) method:

Thread-specific storage is used with the Java Authentication and Authorization Service ( JAAS) to allow security credentials to transparently propagate between resources and applications.

public void close( ) throws JMSException {
   connection.close( );
}

Closing a TopicConnection closes all the objects associated with the connection including the TopicSession, TopicPublisher, and TopicSubscriber.

The TopicSession

After the TopicConnection is obtained, it's used to create TopicSession objects:

// Create a JMS connection
TopicConnection connection =
conFactory.createTopicConnection(username,password);
 
// Create two JMS session objects
TopicSession pubSession =
connection.createTopicSession(false,
                              Session.AUTO_ACKNOWLEDGE);
TopicSession subSession =
connection.createTopicSession(false,
                              Session.AUTO_ACKNOWLEDGE);

A TopicSession object is a factory for creating Message, TopicPublisher, and TopicSubscriber objects. A client can create multiple TopicSession objects to provide more granular control over publishers, subscribers, and their associated transactions. In this case we create two TopicSession objects, pubSession and subSession. We need two objects because of threading restrictions in JMS, which are discussed in the "Sessions and Threading" section later in the chapter.

The boolean parameter in the createTopicSession( ) method indicates whether the Session object will be transacted. A transacted Session automatically manages outgoing and incoming messages within a transaction. Transactions are important but not critical to our discussion at this time, so the parameter is set to false, which means the TopicSession will not be transacted. Transactions are discussed in more detail in Chapter 6, Guaranteed Messaging, Transactions, Acknowledgments, and Failures.

The second parameter indicates the acknowledgment mode used by the JMS client. An acknowledgment is a notification to the message server that the client has received the message. In this case we chose AUTO_ACKNOWLEDGE, which means that the message is automatically acknowledged after it is received by the client.

The TopicSession objects are used to create the TopicPublisher and TopicSubscriber. The TopicPublisher and TopicSubscriber objects are created with a Topic identifier and are dedicated to the TopicSession that created them; they operate under the control of a specific TopicSession:

TopicPublisher publisher = 
    pubSession.createPublisher(chatTopic);
TopicSubscriber subscriber = 
    subSession.createSubscriber(chatTopic);

The TopicSession is also used to create the Message objects that are delivered to the topic. The pubSession is used to create Message objects in the writeMessage( ) method. When you type text at the command line, the main( ) method reads the text and passes it to the Chat instance by invoking writeMessage( ). The writeMessage( ) method (shown in the following example) uses the pubSession object to generate a TextMessage object that can be used to deliver the text to the topic:

protected void writeMessage(String text) throws JMSException{
   TextMessage message = pubSession.createTextMessage( );
   message.setText(username+" : "+text);
   publisher.publish(message);
}

Several Message types can be created by a TopicSession. The most commonly used type is the TextMessage.

The Topic

JNDI is used to locate a Topic object, which is an administered object like the TopicConnectionFactory :

InitialContext jndi = new InitialContext(env);
.
.
// Look up a JMS topic
Topic chatTopic = (Topic)jndi.lookup(topicName);

A Topic object is a handle or identifier for an actual topic, called a physical topic, on the messaging server. A physical topic is an electronic channel to which many clients can subscribe and publish. A topic is analogous to a news group or list server: when a message is sent to a news group or list server, it is delivered to all the subscribers. Similarly, when a JMS client delivers a Message object to a topic, all the clients subscribed to that topic receive the Message.

The Topic object encapsulates a vendor-specific name for identifying a physical topic in the messaging server. The Topic object has one method, getName( ), which returns the name identifier for the physical topic it represents. The name encapsulated by a Topic object is vendor-specific and varies from product to product. For example, one vendor might use dot (".") separated topic names, like "oreilly.jms.chat", while another vendor might use a completely different naming system, similar to LDAP naming, "o=oreilly,cn=chat". Using topic names directly will result in client applications that are not portable across brands of JMS servers. The Topic object hides the topic name from the client, making the client more portable.

As a convention, we'll refer to a physical topic as a topic and only use the term "physical topic" when it's important to stress its difference from a Topic object.

The TopicPublisher

A TopicPublisher was created using the pubSession and the chatTopic:

// Look up a JMS topic
Topic chatTopic = (Topic)jndi.lookup(topicName);
                    
// Create a JMS publisher and subscriber
TopicPublisher publisher = 
    pubSession.createPublisher(chatTopic);

A TopicPublisher is used to deliver messages to a specific topic on a message server. The Topic object used in the createPublisher( ) method identifies the topic that will receive messages from the TopicPublisher. In the Chat example, any text typed on the command line is passed to the Chat class's writeMessage( ) method. This method uses the TopicPublisher to deliver a message to the topic:

protected void writeMessage(String text) throws JMSException{
   TextMessage message = pubSession.createTextMessage( );
   message.setText(username+" : "+text);
   publisher.publish(message);
}

The TopicPublisher objects deliver messages to the topic asynchronously. Asynchronous delivery and consumption of messages is a key characteristic of Message-Oriented Middleware; the TopicPublisher doesn't block or wait until all the subscribers receive the message. Instead, it returns from the publish( ) method as soon as the message server receives the message. It's up to the message server to deliver the message to all the subscribers for that topic.

The TopicSubscriber

The TopicSubscriber is created using the subSession and the chatTopic:

// Look up a JMS topic
Topic chatTopic = (Topic)jndi.lookup(topicName);

// Create a JMS publisher and subscriber                
TopicPublisher publisher = 
    pubSession.createPublisher(chatTopic);
TopicSubscriber subscriber = 
    subSession.createSubscriber(chatTopic);

A TopicSubscriber receives messages from a specific topic. The Topic object argument used in the createSubscriber( ) method identifies the topic from which the TopicSubscriber will receive messages.

The TopicSubscriber receives messages from the message server one at a time (serially). These messages are pushed from the message server to the TopicSubscriber asynchronously, which means that the TopicSubscriber does not have to poll the message server for messages. In our example, each chat client will receive any message published by any of the other chat clients. When a user enters text at the command line, the text message is delivered to all other chat clients that subscribe to the same topic.

Although the in-process event model used by TopicSubscriber is similar to the one used in Java beans, JMS itself is an API and the interfaces it defines are not Java beans.

The pub/sub messaging model in JMS includes an in-process Java event model for handling incoming messages. This is similar to the event-driven model used by Java beans. An object simply implements the listener interface, in this case the MessageListener, and then is registered with the TopicSubscriber. A TopicSubscriber may have only one MessageListener object. Here is the definition of the MessageListener interface used in JMS:

package javax.jms;

public interface MessageListener {
    public void onMessage(Message message);
}

When the TopicSubscriber receives a message from its topic, it invokes the onMessage( ) method of its MessageListener objects. The Chat class itself implements the MessageListener interface and implements the onMessage( ) method:

public class Chat <kbd class=userinput>implements
        javax.jms.MessageListener</kbd>{
    ...
    public void <kbd class=userinput>onMessage</kbd>(Message message){
        try{
            TextMessage textMessage = (TextMessage)message;
            String text = textMessage.getText( );
            System.out.println(text);
        } catch (JMSException jmse){jmse.printStackTrace( );}
    }
    ...
}

The Chat class is a MessageListener type, and therefore registers itself with the TopicSubscriber in its constructor:

TopicSubscriber subscriber = subSession.createSubscriber(chatTopic);

subscriber.setMessageListener(this);

When the message server pushes a message to the TopicSubscriber, the TopicSubscriber invokes the Chat object's onMessage( ) method.

It's fairly easy to confuse the Java Message Service with its use of a Java event model. JMS is an API for asynchronous distributed enterprise messaging that spans processes and machines across a network. The Java event model is used to synchronously deliver events by invoking methods on one or more objects in the same process that have registered as listeners. The JMS pub/sub model uses the Java event model so that a TopicSubscriber can notify its MessageListener object in the same process that a message has arrived from the message server.

The Message

In the chat example, the TextMessage class is used to encapsulate the messages we send and receive. A TextMessage contains a java.lang.String as its body and is the most commonly used message type. The onMessage( ) method receives TextMessage objects from the TopicSubscriber. Likewise, the writeMessage( ) method creates and publishes TextMessage objects using the TopicPublisher:

public void onMessage(Message message){
   try{
      TextMessage textMessage = (TextMessage)message;
      String text = textMessage.getText( );
      System.out.println(text);
   } catch (JMSException jmse){jmse.printStackTrace( );}
}
protected void writeMessage(String text) throws JMSException{
   TextMessage message = pubSession.createTextMessage( );
   message.setText(username+" : "+text);
   publisher.publish(message);
}

A message basically has two parts: a header and payload. The header is comprised of special fields that are used to identify the message, declare attributes of the message, and provide information for routing. The difference between message types is determined largely by their payload, i.e., the type of application data the message contains. The Message class, which is the superclass of all message objects, has no payload. It is a lightweight message that delivers no payload but can serve as a simple event notification. The other message types have special payloads that determine their type and use:

Message
This type has no payload. It is useful for simple event notification.
TextMessage
This type carries a java.lang.String as its payload. It is useful for exchanging simple text messages and also for more complex character data, such as XML documents.
ObjectMessage
This type carries a serializable Java object as its payload. It's useful for exchanging Java objects.
BytesMessage
This type carries an array of primitive bytes as its payload. It's useful for exchanging data in an application's native format, which may not be compatible with other existing Message types. It is also useful where JMS is used purely as a transport between two systems, and the message payload is opaque to the JMS client.
StreamMessage
This type carries a stream of primitive Java types (int, double, char, etc.) as its payload. It provides a set of convenience methods for mapping a formatted stream of bytes to Java primitives. It's an easy programming model when exchanging primitive application data in a fixed order.
MapMessage
This type carries a set of name-value pairs as its payload. The payload is similar to a java.util.Properties object, except the values must be Java primitives or their wrappers. The MapMessage is useful for delivering keyed data.

Sessions and Threading

The Chat application uses a separate session for the publisher and subscriber, pubSession and subSession, respectively. This is due to a threading restriction imposed by JMS. According to the JMS specification, a session may not be operated on by more than one thread at a time. In our example, two threads of control are active: the default main thread of the Chat application and the thread that invokes the onMessage( ) handler. The thread that invokes the onMessage( ) handler is owned by the JMS provider. Since the invocation of the onMessage( ) handler is asynchronous, it could be called while the main thread is publishing a message in the writeMessage( ) method. If both the publisher and subscriber had been created by the same session, the two threads could operate on these methods at the same time; in effect, they could operate on the same TopicSession concurrently -- a condition that is prohibited.

A goal of the JMS specification was to avoid imposing an internal architecture on the JMS provider. Requiring a JMS provider's implementation of a Session object to be capable of safely handling multiple threads was specifically avoided. This is mostly due to one of the intended uses of JMS--that the JMS API be a wrapper around an existing messaging system, which may not have multithreaded delivery capabilities on the client.

The requirement imposed on the JMS provider is that the sending of messages and the asynchronous receiving of messages be processed serially. It is possible to publish-and-subscribe using the same session, but only if the application is publishing from within the onMessage( ) handler. An example of this will be covered in Chapter 4.


Return to ONJava.com.